1use util::ResultExt;
2
3use super::*;
4
5impl Database {
6 /// Returns the count of all projects, excluding ones marked as admin.
7 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
8 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
9 enum QueryAs {
10 Count,
11 }
12
13 self.transaction(|tx| async move {
14 Ok(project::Entity::find()
15 .select_only()
16 .column_as(project::Column::Id.count(), QueryAs::Count)
17 .inner_join(user::Entity)
18 .filter(user::Column::Admin.eq(false))
19 .into_values::<_, QueryAs>()
20 .one(&*tx)
21 .await?
22 .unwrap_or(0i64) as usize)
23 })
24 .await
25 }
26
27 /// Shares a project with the given room.
28 pub async fn share_project(
29 &self,
30 room_id: RoomId,
31 connection: ConnectionId,
32 worktrees: &[proto::WorktreeMetadata],
33 dev_server_project_id: Option<DevServerProjectId>,
34 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
35 self.room_transaction(room_id, |tx| async move {
36 let participant = room_participant::Entity::find()
37 .filter(
38 Condition::all()
39 .add(
40 room_participant::Column::AnsweringConnectionId
41 .eq(connection.id as i32),
42 )
43 .add(
44 room_participant::Column::AnsweringConnectionServerId
45 .eq(connection.owner_id as i32),
46 ),
47 )
48 .one(&*tx)
49 .await?
50 .ok_or_else(|| anyhow!("could not find participant"))?;
51 if participant.room_id != room_id {
52 return Err(anyhow!("shared project on unexpected room"))?;
53 }
54 if !participant
55 .role
56 .unwrap_or(ChannelRole::Member)
57 .can_edit_projects()
58 {
59 return Err(anyhow!("guests cannot share projects"))?;
60 }
61
62 if let Some(dev_server_project_id) = dev_server_project_id {
63 let project = project::Entity::find()
64 .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
65 .one(&*tx)
66 .await?
67 .ok_or_else(|| anyhow!("no remote project"))?;
68
69 let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
70 .find_also_related(dev_server::Entity)
71 .one(&*tx)
72 .await?
73 .ok_or_else(|| anyhow!("no dev_server_project"))?;
74
75 if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
76 return Err(anyhow!("not your dev server"))?;
77 }
78
79 if project.room_id.is_some() {
80 return Err(anyhow!("project already shared"))?;
81 };
82
83 let project = project::Entity::update(project::ActiveModel {
84 room_id: ActiveValue::Set(Some(room_id)),
85 ..project.into_active_model()
86 })
87 .exec(&*tx)
88 .await?;
89
90 let room = self.get_room(room_id, &tx).await?;
91 return Ok((project.id, room));
92 }
93
94 let project = project::ActiveModel {
95 room_id: ActiveValue::set(Some(participant.room_id)),
96 host_user_id: ActiveValue::set(Some(participant.user_id)),
97 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
98 host_connection_server_id: ActiveValue::set(Some(ServerId(
99 connection.owner_id as i32,
100 ))),
101 id: ActiveValue::NotSet,
102 hosted_project_id: ActiveValue::Set(None),
103 dev_server_project_id: ActiveValue::Set(None),
104 }
105 .insert(&*tx)
106 .await?;
107
108 if !worktrees.is_empty() {
109 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
110 worktree::ActiveModel {
111 id: ActiveValue::set(worktree.id as i64),
112 project_id: ActiveValue::set(project.id),
113 abs_path: ActiveValue::set(worktree.abs_path.clone()),
114 root_name: ActiveValue::set(worktree.root_name.clone()),
115 visible: ActiveValue::set(worktree.visible),
116 scan_id: ActiveValue::set(0),
117 completed_scan_id: ActiveValue::set(0),
118 }
119 }))
120 .exec(&*tx)
121 .await?;
122 }
123
124 project_collaborator::ActiveModel {
125 project_id: ActiveValue::set(project.id),
126 connection_id: ActiveValue::set(connection.id as i32),
127 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
128 user_id: ActiveValue::set(participant.user_id),
129 replica_id: ActiveValue::set(ReplicaId(0)),
130 is_host: ActiveValue::set(true),
131 ..Default::default()
132 }
133 .insert(&*tx)
134 .await?;
135
136 let room = self.get_room(room_id, &tx).await?;
137 Ok((project.id, room))
138 })
139 .await
140 }
141
142 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
143 self.weak_transaction(|tx| async move {
144 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
145 Ok(())
146 })
147 .await
148 }
149
150 /// Unshares the given project.
151 pub async fn unshare_project(
152 &self,
153 project_id: ProjectId,
154 connection: ConnectionId,
155 user_id: Option<UserId>,
156 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
157 self.project_transaction(project_id, |tx| async move {
158 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
159 let project = project::Entity::find_by_id(project_id)
160 .one(&*tx)
161 .await?
162 .ok_or_else(|| anyhow!("project not found"))?;
163 let room = if let Some(room_id) = project.room_id {
164 Some(self.get_room(room_id, &tx).await?)
165 } else {
166 None
167 };
168 if project.host_connection()? == connection {
169 return Ok((true, room, guest_connection_ids));
170 }
171 if let Some(dev_server_project_id) = project.dev_server_project_id {
172 if let Some(user_id) = user_id {
173 if user_id
174 != self
175 .owner_for_dev_server_project(dev_server_project_id, &tx)
176 .await?
177 {
178 Err(anyhow!("cannot unshare a project hosted by another user"))?
179 }
180 project::Entity::update(project::ActiveModel {
181 room_id: ActiveValue::Set(None),
182 ..project.into_active_model()
183 })
184 .exec(&*tx)
185 .await?;
186 return Ok((false, room, guest_connection_ids));
187 }
188 }
189
190 Err(anyhow!("cannot unshare a project hosted by another user"))?
191 })
192 .await
193 }
194
195 /// Updates the worktrees associated with the given project.
196 pub async fn update_project(
197 &self,
198 project_id: ProjectId,
199 connection: ConnectionId,
200 worktrees: &[proto::WorktreeMetadata],
201 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
202 self.project_transaction(project_id, |tx| async move {
203 let project = project::Entity::find_by_id(project_id)
204 .filter(
205 Condition::all()
206 .add(project::Column::HostConnectionId.eq(connection.id as i32))
207 .add(
208 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
209 ),
210 )
211 .one(&*tx)
212 .await?
213 .ok_or_else(|| anyhow!("no such project"))?;
214
215 self.update_project_worktrees(project.id, worktrees, &tx)
216 .await?;
217
218 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
219
220 let room = if let Some(room_id) = project.room_id {
221 Some(self.get_room(room_id, &tx).await?)
222 } else {
223 None
224 };
225
226 Ok((room, guest_connection_ids))
227 })
228 .await
229 }
230
231 pub(in crate::db) async fn update_project_worktrees(
232 &self,
233 project_id: ProjectId,
234 worktrees: &[proto::WorktreeMetadata],
235 tx: &DatabaseTransaction,
236 ) -> Result<()> {
237 if !worktrees.is_empty() {
238 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
239 id: ActiveValue::set(worktree.id as i64),
240 project_id: ActiveValue::set(project_id),
241 abs_path: ActiveValue::set(worktree.abs_path.clone()),
242 root_name: ActiveValue::set(worktree.root_name.clone()),
243 visible: ActiveValue::set(worktree.visible),
244 scan_id: ActiveValue::set(0),
245 completed_scan_id: ActiveValue::set(0),
246 }))
247 .on_conflict(
248 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
249 .update_column(worktree::Column::RootName)
250 .to_owned(),
251 )
252 .exec(tx)
253 .await?;
254 }
255
256 worktree::Entity::delete_many()
257 .filter(worktree::Column::ProjectId.eq(project_id).and(
258 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
259 ))
260 .exec(tx)
261 .await?;
262
263 Ok(())
264 }
265
266 pub async fn update_worktree(
267 &self,
268 update: &proto::UpdateWorktree,
269 connection: ConnectionId,
270 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
271 let project_id = ProjectId::from_proto(update.project_id);
272 let worktree_id = update.worktree_id as i64;
273 self.project_transaction(project_id, |tx| async move {
274 // Ensure the update comes from the host.
275 let _project = project::Entity::find_by_id(project_id)
276 .filter(
277 Condition::all()
278 .add(project::Column::HostConnectionId.eq(connection.id as i32))
279 .add(
280 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
281 ),
282 )
283 .one(&*tx)
284 .await?
285 .ok_or_else(|| anyhow!("no such project"))?;
286
287 // Update metadata.
288 worktree::Entity::update(worktree::ActiveModel {
289 id: ActiveValue::set(worktree_id),
290 project_id: ActiveValue::set(project_id),
291 root_name: ActiveValue::set(update.root_name.clone()),
292 scan_id: ActiveValue::set(update.scan_id as i64),
293 completed_scan_id: if update.is_last_update {
294 ActiveValue::set(update.scan_id as i64)
295 } else {
296 ActiveValue::default()
297 },
298 abs_path: ActiveValue::set(update.abs_path.clone()),
299 ..Default::default()
300 })
301 .exec(&*tx)
302 .await?;
303
304 if !update.updated_entries.is_empty() {
305 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
306 let mtime = entry.mtime.clone().unwrap_or_default();
307 worktree_entry::ActiveModel {
308 project_id: ActiveValue::set(project_id),
309 worktree_id: ActiveValue::set(worktree_id),
310 id: ActiveValue::set(entry.id as i64),
311 is_dir: ActiveValue::set(entry.is_dir),
312 path: ActiveValue::set(entry.path.clone()),
313 inode: ActiveValue::set(entry.inode as i64),
314 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
315 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
316 is_symlink: ActiveValue::set(entry.is_symlink),
317 is_ignored: ActiveValue::set(entry.is_ignored),
318 is_external: ActiveValue::set(entry.is_external),
319 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
320 is_deleted: ActiveValue::set(false),
321 scan_id: ActiveValue::set(update.scan_id as i64),
322 is_fifo: ActiveValue::set(entry.is_fifo),
323 }
324 }))
325 .on_conflict(
326 OnConflict::columns([
327 worktree_entry::Column::ProjectId,
328 worktree_entry::Column::WorktreeId,
329 worktree_entry::Column::Id,
330 ])
331 .update_columns([
332 worktree_entry::Column::IsDir,
333 worktree_entry::Column::Path,
334 worktree_entry::Column::Inode,
335 worktree_entry::Column::MtimeSeconds,
336 worktree_entry::Column::MtimeNanos,
337 worktree_entry::Column::IsSymlink,
338 worktree_entry::Column::IsIgnored,
339 worktree_entry::Column::GitStatus,
340 worktree_entry::Column::ScanId,
341 ])
342 .to_owned(),
343 )
344 .exec(&*tx)
345 .await?;
346 }
347
348 if !update.removed_entries.is_empty() {
349 worktree_entry::Entity::update_many()
350 .filter(
351 worktree_entry::Column::ProjectId
352 .eq(project_id)
353 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
354 .and(
355 worktree_entry::Column::Id
356 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
357 ),
358 )
359 .set(worktree_entry::ActiveModel {
360 is_deleted: ActiveValue::Set(true),
361 scan_id: ActiveValue::Set(update.scan_id as i64),
362 ..Default::default()
363 })
364 .exec(&*tx)
365 .await?;
366 }
367
368 if !update.updated_repositories.is_empty() {
369 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
370 |repository| worktree_repository::ActiveModel {
371 project_id: ActiveValue::set(project_id),
372 worktree_id: ActiveValue::set(worktree_id),
373 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
374 scan_id: ActiveValue::set(update.scan_id as i64),
375 branch: ActiveValue::set(repository.branch.clone()),
376 is_deleted: ActiveValue::set(false),
377 },
378 ))
379 .on_conflict(
380 OnConflict::columns([
381 worktree_repository::Column::ProjectId,
382 worktree_repository::Column::WorktreeId,
383 worktree_repository::Column::WorkDirectoryId,
384 ])
385 .update_columns([
386 worktree_repository::Column::ScanId,
387 worktree_repository::Column::Branch,
388 ])
389 .to_owned(),
390 )
391 .exec(&*tx)
392 .await?;
393 }
394
395 if !update.removed_repositories.is_empty() {
396 worktree_repository::Entity::update_many()
397 .filter(
398 worktree_repository::Column::ProjectId
399 .eq(project_id)
400 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
401 .and(
402 worktree_repository::Column::WorkDirectoryId
403 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
404 ),
405 )
406 .set(worktree_repository::ActiveModel {
407 is_deleted: ActiveValue::Set(true),
408 scan_id: ActiveValue::Set(update.scan_id as i64),
409 ..Default::default()
410 })
411 .exec(&*tx)
412 .await?;
413 }
414
415 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
416 Ok(connection_ids)
417 })
418 .await
419 }
420
421 /// Updates the diagnostic summary for the given connection.
422 pub async fn update_diagnostic_summary(
423 &self,
424 update: &proto::UpdateDiagnosticSummary,
425 connection: ConnectionId,
426 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
427 let project_id = ProjectId::from_proto(update.project_id);
428 let worktree_id = update.worktree_id as i64;
429 self.project_transaction(project_id, |tx| async move {
430 let summary = update
431 .summary
432 .as_ref()
433 .ok_or_else(|| anyhow!("invalid summary"))?;
434
435 // Ensure the update comes from the host.
436 let project = project::Entity::find_by_id(project_id)
437 .one(&*tx)
438 .await?
439 .ok_or_else(|| anyhow!("no such project"))?;
440 if project.host_connection()? != connection {
441 return Err(anyhow!("can't update a project hosted by someone else"))?;
442 }
443
444 // Update summary.
445 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
446 project_id: ActiveValue::set(project_id),
447 worktree_id: ActiveValue::set(worktree_id),
448 path: ActiveValue::set(summary.path.clone()),
449 language_server_id: ActiveValue::set(summary.language_server_id as i64),
450 error_count: ActiveValue::set(summary.error_count as i32),
451 warning_count: ActiveValue::set(summary.warning_count as i32),
452 })
453 .on_conflict(
454 OnConflict::columns([
455 worktree_diagnostic_summary::Column::ProjectId,
456 worktree_diagnostic_summary::Column::WorktreeId,
457 worktree_diagnostic_summary::Column::Path,
458 ])
459 .update_columns([
460 worktree_diagnostic_summary::Column::LanguageServerId,
461 worktree_diagnostic_summary::Column::ErrorCount,
462 worktree_diagnostic_summary::Column::WarningCount,
463 ])
464 .to_owned(),
465 )
466 .exec(&*tx)
467 .await?;
468
469 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
470 Ok(connection_ids)
471 })
472 .await
473 }
474
475 /// Starts the language server for the given connection.
476 pub async fn start_language_server(
477 &self,
478 update: &proto::StartLanguageServer,
479 connection: ConnectionId,
480 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
481 let project_id = ProjectId::from_proto(update.project_id);
482 self.project_transaction(project_id, |tx| async move {
483 let server = update
484 .server
485 .as_ref()
486 .ok_or_else(|| anyhow!("invalid language server"))?;
487
488 // Ensure the update comes from the host.
489 let project = project::Entity::find_by_id(project_id)
490 .one(&*tx)
491 .await?
492 .ok_or_else(|| anyhow!("no such project"))?;
493 if project.host_connection()? != connection {
494 return Err(anyhow!("can't update a project hosted by someone else"))?;
495 }
496
497 // Add the newly-started language server.
498 language_server::Entity::insert(language_server::ActiveModel {
499 project_id: ActiveValue::set(project_id),
500 id: ActiveValue::set(server.id as i64),
501 name: ActiveValue::set(server.name.clone()),
502 })
503 .on_conflict(
504 OnConflict::columns([
505 language_server::Column::ProjectId,
506 language_server::Column::Id,
507 ])
508 .update_column(language_server::Column::Name)
509 .to_owned(),
510 )
511 .exec(&*tx)
512 .await?;
513
514 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
515 Ok(connection_ids)
516 })
517 .await
518 }
519
520 /// Updates the worktree settings for the given connection.
521 pub async fn update_worktree_settings(
522 &self,
523 update: &proto::UpdateWorktreeSettings,
524 connection: ConnectionId,
525 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
526 let project_id = ProjectId::from_proto(update.project_id);
527 self.project_transaction(project_id, |tx| async move {
528 // Ensure the update comes from the host.
529 let project = project::Entity::find_by_id(project_id)
530 .one(&*tx)
531 .await?
532 .ok_or_else(|| anyhow!("no such project"))?;
533 if project.host_connection()? != connection {
534 return Err(anyhow!("can't update a project hosted by someone else"))?;
535 }
536
537 if let Some(content) = &update.content {
538 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
539 project_id: ActiveValue::Set(project_id),
540 worktree_id: ActiveValue::Set(update.worktree_id as i64),
541 path: ActiveValue::Set(update.path.clone()),
542 content: ActiveValue::Set(content.clone()),
543 })
544 .on_conflict(
545 OnConflict::columns([
546 worktree_settings_file::Column::ProjectId,
547 worktree_settings_file::Column::WorktreeId,
548 worktree_settings_file::Column::Path,
549 ])
550 .update_column(worktree_settings_file::Column::Content)
551 .to_owned(),
552 )
553 .exec(&*tx)
554 .await?;
555 } else {
556 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
557 project_id: ActiveValue::Set(project_id),
558 worktree_id: ActiveValue::Set(update.worktree_id as i64),
559 path: ActiveValue::Set(update.path.clone()),
560 ..Default::default()
561 })
562 .exec(&*tx)
563 .await?;
564 }
565
566 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
567 Ok(connection_ids)
568 })
569 .await
570 }
571
572 /// Adds the given connection to the specified hosted project
573 pub async fn join_hosted_project(
574 &self,
575 id: ProjectId,
576 user_id: UserId,
577 connection: ConnectionId,
578 ) -> Result<(Project, ReplicaId)> {
579 self.transaction(|tx| async move {
580 let (project, hosted_project) = project::Entity::find_by_id(id)
581 .find_also_related(hosted_project::Entity)
582 .one(&*tx)
583 .await?
584 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
585
586 let Some(hosted_project) = hosted_project else {
587 return Err(anyhow!("project is not hosted"))?;
588 };
589
590 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
591 .one(&*tx)
592 .await?
593 .ok_or_else(|| anyhow!("no such channel"))?;
594
595 let role = self
596 .check_user_is_channel_participant(&channel, user_id, &tx)
597 .await?;
598
599 self.join_project_internal(project, user_id, connection, role, &tx)
600 .await
601 })
602 .await
603 }
604
605 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
606 self.transaction(|tx| async move {
607 Ok(project::Entity::find_by_id(id)
608 .one(&*tx)
609 .await?
610 .ok_or_else(|| anyhow!("no such project"))?)
611 })
612 .await
613 }
614
615 pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
616 self.transaction(|tx| async move {
617 Ok(project::Entity::find()
618 .filter(project::Column::DevServerProjectId.eq(id))
619 .one(&*tx)
620 .await?
621 .ok_or_else(|| anyhow!("no such project"))?)
622 })
623 .await
624 }
625
626 /// Adds the given connection to the specified project
627 /// in the current room.
628 pub async fn join_project(
629 &self,
630 project_id: ProjectId,
631 connection: ConnectionId,
632 user_id: UserId,
633 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
634 self.project_transaction(project_id, |tx| async move {
635 let (project, role) = self
636 .access_project(
637 project_id,
638 connection,
639 PrincipalId::UserId(user_id),
640 Capability::ReadOnly,
641 &tx,
642 )
643 .await?;
644 self.join_project_internal(project, user_id, connection, role, &tx)
645 .await
646 })
647 .await
648 }
649
650 async fn join_project_internal(
651 &self,
652 project: project::Model,
653 user_id: UserId,
654 connection: ConnectionId,
655 role: ChannelRole,
656 tx: &DatabaseTransaction,
657 ) -> Result<(Project, ReplicaId)> {
658 let mut collaborators = project
659 .find_related(project_collaborator::Entity)
660 .all(tx)
661 .await?;
662 let replica_ids = collaborators
663 .iter()
664 .map(|c| c.replica_id)
665 .collect::<HashSet<_>>();
666 let mut replica_id = ReplicaId(1);
667 while replica_ids.contains(&replica_id) {
668 replica_id.0 += 1;
669 }
670 let new_collaborator = project_collaborator::ActiveModel {
671 project_id: ActiveValue::set(project.id),
672 connection_id: ActiveValue::set(connection.id as i32),
673 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
674 user_id: ActiveValue::set(user_id),
675 replica_id: ActiveValue::set(replica_id),
676 is_host: ActiveValue::set(false),
677 ..Default::default()
678 }
679 .insert(tx)
680 .await?;
681 collaborators.push(new_collaborator);
682
683 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
684 let mut worktrees = db_worktrees
685 .into_iter()
686 .map(|db_worktree| {
687 (
688 db_worktree.id as u64,
689 Worktree {
690 id: db_worktree.id as u64,
691 abs_path: db_worktree.abs_path,
692 root_name: db_worktree.root_name,
693 visible: db_worktree.visible,
694 entries: Default::default(),
695 repository_entries: Default::default(),
696 diagnostic_summaries: Default::default(),
697 settings_files: Default::default(),
698 scan_id: db_worktree.scan_id as u64,
699 completed_scan_id: db_worktree.completed_scan_id as u64,
700 },
701 )
702 })
703 .collect::<BTreeMap<_, _>>();
704
705 // Populate worktree entries.
706 {
707 let mut db_entries = worktree_entry::Entity::find()
708 .filter(
709 Condition::all()
710 .add(worktree_entry::Column::ProjectId.eq(project.id))
711 .add(worktree_entry::Column::IsDeleted.eq(false)),
712 )
713 .stream(tx)
714 .await?;
715 while let Some(db_entry) = db_entries.next().await {
716 let db_entry = db_entry?;
717 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
718 worktree.entries.push(proto::Entry {
719 id: db_entry.id as u64,
720 is_dir: db_entry.is_dir,
721 path: db_entry.path,
722 inode: db_entry.inode as u64,
723 mtime: Some(proto::Timestamp {
724 seconds: db_entry.mtime_seconds as u64,
725 nanos: db_entry.mtime_nanos as u32,
726 }),
727 is_symlink: db_entry.is_symlink,
728 is_ignored: db_entry.is_ignored,
729 is_external: db_entry.is_external,
730 git_status: db_entry.git_status.map(|status| status as i32),
731 // This is only used in the summarization backlog, so if it's None,
732 // that just means we won't be able to detect when to resummarize
733 // based on total number of backlogged bytes - instead, we'd go
734 // on number of files only. That shouldn't be a huge deal in practice.
735 size: None,
736 is_fifo: db_entry.is_fifo,
737 });
738 }
739 }
740 }
741
742 // Populate repository entries.
743 {
744 let mut db_repository_entries = worktree_repository::Entity::find()
745 .filter(
746 Condition::all()
747 .add(worktree_repository::Column::ProjectId.eq(project.id))
748 .add(worktree_repository::Column::IsDeleted.eq(false)),
749 )
750 .stream(tx)
751 .await?;
752 while let Some(db_repository_entry) = db_repository_entries.next().await {
753 let db_repository_entry = db_repository_entry?;
754 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
755 {
756 worktree.repository_entries.insert(
757 db_repository_entry.work_directory_id as u64,
758 proto::RepositoryEntry {
759 work_directory_id: db_repository_entry.work_directory_id as u64,
760 branch: db_repository_entry.branch,
761 },
762 );
763 }
764 }
765 }
766
767 // Populate worktree diagnostic summaries.
768 {
769 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
770 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
771 .stream(tx)
772 .await?;
773 while let Some(db_summary) = db_summaries.next().await {
774 let db_summary = db_summary?;
775 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
776 worktree
777 .diagnostic_summaries
778 .push(proto::DiagnosticSummary {
779 path: db_summary.path,
780 language_server_id: db_summary.language_server_id as u64,
781 error_count: db_summary.error_count as u32,
782 warning_count: db_summary.warning_count as u32,
783 });
784 }
785 }
786 }
787
788 // Populate worktree settings files
789 {
790 let mut db_settings_files = worktree_settings_file::Entity::find()
791 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
792 .stream(tx)
793 .await?;
794 while let Some(db_settings_file) = db_settings_files.next().await {
795 let db_settings_file = db_settings_file?;
796 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
797 worktree.settings_files.push(WorktreeSettingsFile {
798 path: db_settings_file.path,
799 content: db_settings_file.content,
800 });
801 }
802 }
803 }
804
805 // Populate language servers.
806 let language_servers = project
807 .find_related(language_server::Entity)
808 .all(tx)
809 .await?;
810
811 let project = Project {
812 id: project.id,
813 role,
814 collaborators: collaborators
815 .into_iter()
816 .map(|collaborator| ProjectCollaborator {
817 connection_id: collaborator.connection(),
818 user_id: collaborator.user_id,
819 replica_id: collaborator.replica_id,
820 is_host: collaborator.is_host,
821 })
822 .collect(),
823 worktrees,
824 language_servers: language_servers
825 .into_iter()
826 .map(|language_server| proto::LanguageServer {
827 id: language_server.id as u64,
828 name: language_server.name,
829 })
830 .collect(),
831 dev_server_project_id: project.dev_server_project_id,
832 };
833 Ok((project, replica_id as ReplicaId))
834 }
835
836 pub async fn leave_hosted_project(
837 &self,
838 project_id: ProjectId,
839 connection: ConnectionId,
840 ) -> Result<LeftProject> {
841 self.transaction(|tx| async move {
842 let result = project_collaborator::Entity::delete_many()
843 .filter(
844 Condition::all()
845 .add(project_collaborator::Column::ProjectId.eq(project_id))
846 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
847 .add(
848 project_collaborator::Column::ConnectionServerId
849 .eq(connection.owner_id as i32),
850 ),
851 )
852 .exec(&*tx)
853 .await?;
854 if result.rows_affected == 0 {
855 return Err(anyhow!("not in the project"))?;
856 }
857
858 let project = project::Entity::find_by_id(project_id)
859 .one(&*tx)
860 .await?
861 .ok_or_else(|| anyhow!("no such project"))?;
862 let collaborators = project
863 .find_related(project_collaborator::Entity)
864 .all(&*tx)
865 .await?;
866 let connection_ids = collaborators
867 .into_iter()
868 .map(|collaborator| collaborator.connection())
869 .collect();
870 Ok(LeftProject {
871 id: project.id,
872 connection_ids,
873 should_unshare: false,
874 })
875 })
876 .await
877 }
878
879 /// Removes the given connection from the specified project.
880 pub async fn leave_project(
881 &self,
882 project_id: ProjectId,
883 connection: ConnectionId,
884 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
885 self.project_transaction(project_id, |tx| async move {
886 let result = project_collaborator::Entity::delete_many()
887 .filter(
888 Condition::all()
889 .add(project_collaborator::Column::ProjectId.eq(project_id))
890 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
891 .add(
892 project_collaborator::Column::ConnectionServerId
893 .eq(connection.owner_id as i32),
894 ),
895 )
896 .exec(&*tx)
897 .await?;
898 if result.rows_affected == 0 {
899 Err(anyhow!("not a collaborator on this project"))?;
900 }
901
902 let project = project::Entity::find_by_id(project_id)
903 .one(&*tx)
904 .await?
905 .ok_or_else(|| anyhow!("no such project"))?;
906 let collaborators = project
907 .find_related(project_collaborator::Entity)
908 .all(&*tx)
909 .await?;
910 let connection_ids: Vec<ConnectionId> = collaborators
911 .into_iter()
912 .map(|collaborator| collaborator.connection())
913 .collect();
914
915 follower::Entity::delete_many()
916 .filter(
917 Condition::any()
918 .add(
919 Condition::all()
920 .add(follower::Column::ProjectId.eq(Some(project_id)))
921 .add(
922 follower::Column::LeaderConnectionServerId
923 .eq(connection.owner_id),
924 )
925 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
926 )
927 .add(
928 Condition::all()
929 .add(follower::Column::ProjectId.eq(Some(project_id)))
930 .add(
931 follower::Column::FollowerConnectionServerId
932 .eq(connection.owner_id),
933 )
934 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
935 ),
936 )
937 .exec(&*tx)
938 .await?;
939
940 let room = if let Some(room_id) = project.room_id {
941 Some(self.get_room(room_id, &tx).await?)
942 } else {
943 None
944 };
945
946 let left_project = LeftProject {
947 id: project_id,
948 should_unshare: connection == project.host_connection()?,
949 connection_ids,
950 };
951 Ok((room, left_project))
952 })
953 .await
954 }
955
956 pub async fn check_user_is_project_host(
957 &self,
958 project_id: ProjectId,
959 connection_id: ConnectionId,
960 ) -> Result<()> {
961 self.project_transaction(project_id, |tx| async move {
962 project::Entity::find()
963 .filter(
964 Condition::all()
965 .add(project::Column::Id.eq(project_id))
966 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
967 .add(
968 project::Column::HostConnectionServerId
969 .eq(Some(connection_id.owner_id as i32)),
970 ),
971 )
972 .one(&*tx)
973 .await?
974 .ok_or_else(|| anyhow!("failed to read project host"))?;
975
976 Ok(())
977 })
978 .await
979 .map(|guard| guard.into_inner())
980 }
981
982 /// Returns the current project if the given user is authorized to access it with the specified capability.
983 pub async fn access_project(
984 &self,
985 project_id: ProjectId,
986 connection_id: ConnectionId,
987 principal_id: PrincipalId,
988 capability: Capability,
989 tx: &DatabaseTransaction,
990 ) -> Result<(project::Model, ChannelRole)> {
991 let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
992 .find_also_related(dev_server_project::Entity)
993 .one(tx)
994 .await?
995 .ok_or_else(|| anyhow!("no such project"))?;
996
997 let user_id = match principal_id {
998 PrincipalId::DevServerId(_) => {
999 if project
1000 .host_connection()
1001 .is_ok_and(|connection| connection == connection_id)
1002 {
1003 return Ok((project, ChannelRole::Admin));
1004 }
1005 return Err(anyhow!("not the project host"))?;
1006 }
1007 PrincipalId::UserId(user_id) => user_id,
1008 };
1009
1010 let role_from_room = if let Some(room_id) = project.room_id {
1011 room_participant::Entity::find()
1012 .filter(room_participant::Column::RoomId.eq(room_id))
1013 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1014 .one(tx)
1015 .await?
1016 .and_then(|participant| participant.role)
1017 } else {
1018 None
1019 };
1020 let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1021 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1022 .one(tx)
1023 .await?
1024 .ok_or_else(|| anyhow!("no such channel"))?;
1025 if user_id == dev_server.user_id {
1026 // If the user left the room "uncleanly" they may rejoin the
1027 // remote project before leave_room runs. IN that case kick
1028 // the project out of the room pre-emptively.
1029 if role_from_room.is_none() {
1030 project = project::Entity::update(project::ActiveModel {
1031 room_id: ActiveValue::Set(None),
1032 ..project.into_active_model()
1033 })
1034 .exec(tx)
1035 .await?;
1036 }
1037 Some(ChannelRole::Admin)
1038 } else {
1039 None
1040 }
1041 } else {
1042 None
1043 };
1044
1045 let role = role_from_dev_server
1046 .or(role_from_room)
1047 .unwrap_or(ChannelRole::Banned);
1048
1049 match capability {
1050 Capability::ReadWrite => {
1051 if !role.can_edit_projects() {
1052 return Err(anyhow!("not authorized to edit projects"))?;
1053 }
1054 }
1055 Capability::ReadOnly => {
1056 if !role.can_read_projects() {
1057 return Err(anyhow!("not authorized to read projects"))?;
1058 }
1059 }
1060 }
1061
1062 Ok((project, role))
1063 }
1064
1065 /// Returns the host connection for a read-only request to join a shared project.
1066 pub async fn host_for_read_only_project_request(
1067 &self,
1068 project_id: ProjectId,
1069 connection_id: ConnectionId,
1070 user_id: UserId,
1071 ) -> Result<ConnectionId> {
1072 self.project_transaction(project_id, |tx| async move {
1073 let (project, _) = self
1074 .access_project(
1075 project_id,
1076 connection_id,
1077 PrincipalId::UserId(user_id),
1078 Capability::ReadOnly,
1079 &tx,
1080 )
1081 .await?;
1082 project.host_connection()
1083 })
1084 .await
1085 .map(|guard| guard.into_inner())
1086 }
1087
1088 /// Returns the host connection for a request to join a shared project.
1089 pub async fn host_for_mutating_project_request(
1090 &self,
1091 project_id: ProjectId,
1092 connection_id: ConnectionId,
1093 user_id: UserId,
1094 ) -> Result<ConnectionId> {
1095 self.project_transaction(project_id, |tx| async move {
1096 let (project, _) = self
1097 .access_project(
1098 project_id,
1099 connection_id,
1100 PrincipalId::UserId(user_id),
1101 Capability::ReadWrite,
1102 &tx,
1103 )
1104 .await?;
1105 project.host_connection()
1106 })
1107 .await
1108 .map(|guard| guard.into_inner())
1109 }
1110
1111 /// Returns the host connection for a request to join a shared project.
1112 pub async fn host_for_owner_project_request(
1113 &self,
1114 project_id: ProjectId,
1115 _connection_id: ConnectionId,
1116 user_id: UserId,
1117 ) -> Result<ConnectionId> {
1118 self.project_transaction(project_id, |tx| async move {
1119 let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1120 .find_also_related(dev_server_project::Entity)
1121 .one(&*tx)
1122 .await?
1123 .ok_or_else(|| anyhow!("no such project"))?;
1124
1125 let Some(dev_server_project) = dev_server_project else {
1126 return Err(anyhow!("not a dev server project"))?;
1127 };
1128 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1129 .one(&*tx)
1130 .await?
1131 .ok_or_else(|| anyhow!("no such dev server"))?;
1132 if dev_server.user_id != user_id {
1133 return Err(anyhow!("not your project"))?;
1134 }
1135 project.host_connection()
1136 })
1137 .await
1138 .map(|guard| guard.into_inner())
1139 }
1140
1141 pub async fn connections_for_buffer_update(
1142 &self,
1143 project_id: ProjectId,
1144 principal_id: PrincipalId,
1145 connection_id: ConnectionId,
1146 capability: Capability,
1147 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1148 self.project_transaction(project_id, |tx| async move {
1149 // Authorize
1150 let (project, _) = self
1151 .access_project(project_id, connection_id, principal_id, capability, &tx)
1152 .await?;
1153
1154 let host_connection_id = project.host_connection()?;
1155
1156 let collaborators = project_collaborator::Entity::find()
1157 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1158 .all(&*tx)
1159 .await?;
1160
1161 let guest_connection_ids = collaborators
1162 .into_iter()
1163 .filter_map(|collaborator| {
1164 if collaborator.is_host {
1165 None
1166 } else {
1167 Some(collaborator.connection())
1168 }
1169 })
1170 .collect();
1171
1172 Ok((host_connection_id, guest_connection_ids))
1173 })
1174 .await
1175 }
1176
1177 /// Returns the connection IDs in the given project.
1178 ///
1179 /// The provided `connection_id` must also be a collaborator in the project,
1180 /// otherwise an error will be returned.
1181 pub async fn project_connection_ids(
1182 &self,
1183 project_id: ProjectId,
1184 connection_id: ConnectionId,
1185 exclude_dev_server: bool,
1186 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1187 self.project_transaction(project_id, |tx| async move {
1188 let project = project::Entity::find_by_id(project_id)
1189 .one(&*tx)
1190 .await?
1191 .ok_or_else(|| anyhow!("no such project"))?;
1192
1193 let mut collaborators = project_collaborator::Entity::find()
1194 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1195 .stream(&*tx)
1196 .await?;
1197
1198 let mut connection_ids = HashSet::default();
1199 if let Some(host_connection) = project.host_connection().log_err() {
1200 if !exclude_dev_server {
1201 connection_ids.insert(host_connection);
1202 }
1203 }
1204
1205 while let Some(collaborator) = collaborators.next().await {
1206 let collaborator = collaborator?;
1207 connection_ids.insert(collaborator.connection());
1208 }
1209
1210 if connection_ids.contains(&connection_id)
1211 || Some(connection_id) == project.host_connection().ok()
1212 {
1213 Ok(connection_ids)
1214 } else {
1215 Err(anyhow!(
1216 "can only send project updates to a project you're in"
1217 ))?
1218 }
1219 })
1220 .await
1221 }
1222
1223 async fn project_guest_connection_ids(
1224 &self,
1225 project_id: ProjectId,
1226 tx: &DatabaseTransaction,
1227 ) -> Result<Vec<ConnectionId>> {
1228 let mut collaborators = project_collaborator::Entity::find()
1229 .filter(
1230 project_collaborator::Column::ProjectId
1231 .eq(project_id)
1232 .and(project_collaborator::Column::IsHost.eq(false)),
1233 )
1234 .stream(tx)
1235 .await?;
1236
1237 let mut guest_connection_ids = Vec::new();
1238 while let Some(collaborator) = collaborators.next().await {
1239 let collaborator = collaborator?;
1240 guest_connection_ids.push(collaborator.connection());
1241 }
1242 Ok(guest_connection_ids)
1243 }
1244
1245 /// Returns the [`RoomId`] for the given project.
1246 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1247 self.transaction(|tx| async move {
1248 Ok(project::Entity::find_by_id(project_id)
1249 .one(&*tx)
1250 .await?
1251 .and_then(|project| project.room_id))
1252 })
1253 .await
1254 }
1255
1256 pub async fn check_room_participants(
1257 &self,
1258 room_id: RoomId,
1259 leader_id: ConnectionId,
1260 follower_id: ConnectionId,
1261 ) -> Result<()> {
1262 self.transaction(|tx| async move {
1263 use room_participant::Column;
1264
1265 let count = room_participant::Entity::find()
1266 .filter(
1267 Condition::all().add(Column::RoomId.eq(room_id)).add(
1268 Condition::any()
1269 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1270 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1271 ))
1272 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1273 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1274 )),
1275 ),
1276 )
1277 .count(&*tx)
1278 .await?;
1279
1280 if count < 2 {
1281 Err(anyhow!("not room participants"))?;
1282 }
1283
1284 Ok(())
1285 })
1286 .await
1287 }
1288
1289 /// Adds the given follower connection as a follower of the given leader connection.
1290 pub async fn follow(
1291 &self,
1292 room_id: RoomId,
1293 project_id: ProjectId,
1294 leader_connection: ConnectionId,
1295 follower_connection: ConnectionId,
1296 ) -> Result<TransactionGuard<proto::Room>> {
1297 self.room_transaction(room_id, |tx| async move {
1298 follower::ActiveModel {
1299 room_id: ActiveValue::set(room_id),
1300 project_id: ActiveValue::set(project_id),
1301 leader_connection_server_id: ActiveValue::set(ServerId(
1302 leader_connection.owner_id as i32,
1303 )),
1304 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1305 follower_connection_server_id: ActiveValue::set(ServerId(
1306 follower_connection.owner_id as i32,
1307 )),
1308 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1309 ..Default::default()
1310 }
1311 .insert(&*tx)
1312 .await?;
1313
1314 let room = self.get_room(room_id, &tx).await?;
1315 Ok(room)
1316 })
1317 .await
1318 }
1319
1320 /// Removes the given follower connection as a follower of the given leader connection.
1321 pub async fn unfollow(
1322 &self,
1323 room_id: RoomId,
1324 project_id: ProjectId,
1325 leader_connection: ConnectionId,
1326 follower_connection: ConnectionId,
1327 ) -> Result<TransactionGuard<proto::Room>> {
1328 self.room_transaction(room_id, |tx| async move {
1329 follower::Entity::delete_many()
1330 .filter(
1331 Condition::all()
1332 .add(follower::Column::RoomId.eq(room_id))
1333 .add(follower::Column::ProjectId.eq(project_id))
1334 .add(
1335 follower::Column::LeaderConnectionServerId
1336 .eq(leader_connection.owner_id),
1337 )
1338 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1339 .add(
1340 follower::Column::FollowerConnectionServerId
1341 .eq(follower_connection.owner_id),
1342 )
1343 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1344 )
1345 .exec(&*tx)
1346 .await?;
1347
1348 let room = self.get_room(room_id, &tx).await?;
1349 Ok(room)
1350 })
1351 .await
1352 }
1353}