1use util::ResultExt;
2
3use super::*;
4
5impl Database {
6 /// Returns the count of all projects, excluding ones marked as admin.
7 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
8 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
9 enum QueryAs {
10 Count,
11 }
12
13 self.transaction(|tx| async move {
14 Ok(project::Entity::find()
15 .select_only()
16 .column_as(project::Column::Id.count(), QueryAs::Count)
17 .inner_join(user::Entity)
18 .filter(user::Column::Admin.eq(false))
19 .into_values::<_, QueryAs>()
20 .one(&*tx)
21 .await?
22 .unwrap_or(0i64) as usize)
23 })
24 .await
25 }
26
27 /// Shares a project with the given room.
28 pub async fn share_project(
29 &self,
30 room_id: RoomId,
31 connection: ConnectionId,
32 worktrees: &[proto::WorktreeMetadata],
33 dev_server_project_id: Option<DevServerProjectId>,
34 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
35 self.room_transaction(room_id, |tx| async move {
36 let participant = room_participant::Entity::find()
37 .filter(
38 Condition::all()
39 .add(
40 room_participant::Column::AnsweringConnectionId
41 .eq(connection.id as i32),
42 )
43 .add(
44 room_participant::Column::AnsweringConnectionServerId
45 .eq(connection.owner_id as i32),
46 ),
47 )
48 .one(&*tx)
49 .await?
50 .ok_or_else(|| anyhow!("could not find participant"))?;
51 if participant.room_id != room_id {
52 return Err(anyhow!("shared project on unexpected room"))?;
53 }
54 if !participant
55 .role
56 .unwrap_or(ChannelRole::Member)
57 .can_edit_projects()
58 {
59 return Err(anyhow!("guests cannot share projects"))?;
60 }
61
62 if let Some(dev_server_project_id) = dev_server_project_id {
63 let project = project::Entity::find()
64 .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
65 .one(&*tx)
66 .await?
67 .ok_or_else(|| anyhow!("no remote project"))?;
68
69 let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
70 .find_also_related(dev_server::Entity)
71 .one(&*tx)
72 .await?
73 .ok_or_else(|| anyhow!("no dev_server_project"))?;
74
75 if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
76 return Err(anyhow!("not your dev server"))?;
77 }
78
79 if project.room_id.is_some() {
80 return Err(anyhow!("project already shared"))?;
81 };
82
83 let project = project::Entity::update(project::ActiveModel {
84 room_id: ActiveValue::Set(Some(room_id)),
85 ..project.into_active_model()
86 })
87 .exec(&*tx)
88 .await?;
89
90 let room = self.get_room(room_id, &tx).await?;
91 return Ok((project.id, room));
92 }
93
94 let project = project::ActiveModel {
95 room_id: ActiveValue::set(Some(participant.room_id)),
96 host_user_id: ActiveValue::set(Some(participant.user_id)),
97 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
98 host_connection_server_id: ActiveValue::set(Some(ServerId(
99 connection.owner_id as i32,
100 ))),
101 id: ActiveValue::NotSet,
102 hosted_project_id: ActiveValue::Set(None),
103 dev_server_project_id: ActiveValue::Set(None),
104 }
105 .insert(&*tx)
106 .await?;
107
108 if !worktrees.is_empty() {
109 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
110 worktree::ActiveModel {
111 id: ActiveValue::set(worktree.id as i64),
112 project_id: ActiveValue::set(project.id),
113 abs_path: ActiveValue::set(worktree.abs_path.clone()),
114 root_name: ActiveValue::set(worktree.root_name.clone()),
115 visible: ActiveValue::set(worktree.visible),
116 scan_id: ActiveValue::set(0),
117 completed_scan_id: ActiveValue::set(0),
118 }
119 }))
120 .exec(&*tx)
121 .await?;
122 }
123
124 project_collaborator::ActiveModel {
125 project_id: ActiveValue::set(project.id),
126 connection_id: ActiveValue::set(connection.id as i32),
127 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
128 user_id: ActiveValue::set(participant.user_id),
129 replica_id: ActiveValue::set(ReplicaId(0)),
130 is_host: ActiveValue::set(true),
131 ..Default::default()
132 }
133 .insert(&*tx)
134 .await?;
135
136 let room = self.get_room(room_id, &tx).await?;
137 Ok((project.id, room))
138 })
139 .await
140 }
141
142 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
143 self.weak_transaction(|tx| async move {
144 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
145 Ok(())
146 })
147 .await
148 }
149
150 /// Unshares the given project.
151 pub async fn unshare_project(
152 &self,
153 project_id: ProjectId,
154 connection: ConnectionId,
155 user_id: Option<UserId>,
156 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
157 self.project_transaction(project_id, |tx| async move {
158 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
159 let project = project::Entity::find_by_id(project_id)
160 .one(&*tx)
161 .await?
162 .ok_or_else(|| anyhow!("project not found"))?;
163 let room = if let Some(room_id) = project.room_id {
164 Some(self.get_room(room_id, &tx).await?)
165 } else {
166 None
167 };
168 if project.host_connection()? == connection {
169 return Ok((true, room, guest_connection_ids));
170 }
171 if let Some(dev_server_project_id) = project.dev_server_project_id {
172 if let Some(user_id) = user_id {
173 if user_id
174 != self
175 .owner_for_dev_server_project(dev_server_project_id, &tx)
176 .await?
177 {
178 Err(anyhow!("cannot unshare a project hosted by another user"))?
179 }
180 project::Entity::update(project::ActiveModel {
181 room_id: ActiveValue::Set(None),
182 ..project.into_active_model()
183 })
184 .exec(&*tx)
185 .await?;
186 return Ok((false, room, guest_connection_ids));
187 }
188 }
189
190 Err(anyhow!("cannot unshare a project hosted by another user"))?
191 })
192 .await
193 }
194
195 /// Updates the worktrees associated with the given project.
196 pub async fn update_project(
197 &self,
198 project_id: ProjectId,
199 connection: ConnectionId,
200 worktrees: &[proto::WorktreeMetadata],
201 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
202 self.project_transaction(project_id, |tx| async move {
203 let project = project::Entity::find_by_id(project_id)
204 .filter(
205 Condition::all()
206 .add(project::Column::HostConnectionId.eq(connection.id as i32))
207 .add(
208 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
209 ),
210 )
211 .one(&*tx)
212 .await?
213 .ok_or_else(|| anyhow!("no such project"))?;
214
215 self.update_project_worktrees(project.id, worktrees, &tx)
216 .await?;
217
218 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
219
220 let room = if let Some(room_id) = project.room_id {
221 Some(self.get_room(room_id, &tx).await?)
222 } else {
223 None
224 };
225
226 Ok((room, guest_connection_ids))
227 })
228 .await
229 }
230
231 pub(in crate::db) async fn update_project_worktrees(
232 &self,
233 project_id: ProjectId,
234 worktrees: &[proto::WorktreeMetadata],
235 tx: &DatabaseTransaction,
236 ) -> Result<()> {
237 if !worktrees.is_empty() {
238 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
239 id: ActiveValue::set(worktree.id as i64),
240 project_id: ActiveValue::set(project_id),
241 abs_path: ActiveValue::set(worktree.abs_path.clone()),
242 root_name: ActiveValue::set(worktree.root_name.clone()),
243 visible: ActiveValue::set(worktree.visible),
244 scan_id: ActiveValue::set(0),
245 completed_scan_id: ActiveValue::set(0),
246 }))
247 .on_conflict(
248 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
249 .update_column(worktree::Column::RootName)
250 .to_owned(),
251 )
252 .exec(tx)
253 .await?;
254 }
255
256 worktree::Entity::delete_many()
257 .filter(worktree::Column::ProjectId.eq(project_id).and(
258 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
259 ))
260 .exec(tx)
261 .await?;
262
263 Ok(())
264 }
265
266 pub async fn update_worktree(
267 &self,
268 update: &proto::UpdateWorktree,
269 connection: ConnectionId,
270 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
271 let project_id = ProjectId::from_proto(update.project_id);
272 let worktree_id = update.worktree_id as i64;
273 self.project_transaction(project_id, |tx| async move {
274 // Ensure the update comes from the host.
275 let _project = project::Entity::find_by_id(project_id)
276 .filter(
277 Condition::all()
278 .add(project::Column::HostConnectionId.eq(connection.id as i32))
279 .add(
280 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
281 ),
282 )
283 .one(&*tx)
284 .await?
285 .ok_or_else(|| anyhow!("no such project"))?;
286
287 // Update metadata.
288 worktree::Entity::update(worktree::ActiveModel {
289 id: ActiveValue::set(worktree_id),
290 project_id: ActiveValue::set(project_id),
291 root_name: ActiveValue::set(update.root_name.clone()),
292 scan_id: ActiveValue::set(update.scan_id as i64),
293 completed_scan_id: if update.is_last_update {
294 ActiveValue::set(update.scan_id as i64)
295 } else {
296 ActiveValue::default()
297 },
298 abs_path: ActiveValue::set(update.abs_path.clone()),
299 ..Default::default()
300 })
301 .exec(&*tx)
302 .await?;
303
304 if !update.updated_entries.is_empty() {
305 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
306 let mtime = entry.mtime.clone().unwrap_or_default();
307 worktree_entry::ActiveModel {
308 project_id: ActiveValue::set(project_id),
309 worktree_id: ActiveValue::set(worktree_id),
310 id: ActiveValue::set(entry.id as i64),
311 is_dir: ActiveValue::set(entry.is_dir),
312 path: ActiveValue::set(entry.path.clone()),
313 inode: ActiveValue::set(entry.inode as i64),
314 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
315 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
316 is_symlink: ActiveValue::set(entry.is_symlink),
317 is_ignored: ActiveValue::set(entry.is_ignored),
318 is_external: ActiveValue::set(entry.is_external),
319 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
320 is_deleted: ActiveValue::set(false),
321 scan_id: ActiveValue::set(update.scan_id as i64),
322 }
323 }))
324 .on_conflict(
325 OnConflict::columns([
326 worktree_entry::Column::ProjectId,
327 worktree_entry::Column::WorktreeId,
328 worktree_entry::Column::Id,
329 ])
330 .update_columns([
331 worktree_entry::Column::IsDir,
332 worktree_entry::Column::Path,
333 worktree_entry::Column::Inode,
334 worktree_entry::Column::MtimeSeconds,
335 worktree_entry::Column::MtimeNanos,
336 worktree_entry::Column::IsSymlink,
337 worktree_entry::Column::IsIgnored,
338 worktree_entry::Column::GitStatus,
339 worktree_entry::Column::ScanId,
340 ])
341 .to_owned(),
342 )
343 .exec(&*tx)
344 .await?;
345 }
346
347 if !update.removed_entries.is_empty() {
348 worktree_entry::Entity::update_many()
349 .filter(
350 worktree_entry::Column::ProjectId
351 .eq(project_id)
352 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
353 .and(
354 worktree_entry::Column::Id
355 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
356 ),
357 )
358 .set(worktree_entry::ActiveModel {
359 is_deleted: ActiveValue::Set(true),
360 scan_id: ActiveValue::Set(update.scan_id as i64),
361 ..Default::default()
362 })
363 .exec(&*tx)
364 .await?;
365 }
366
367 if !update.updated_repositories.is_empty() {
368 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
369 |repository| worktree_repository::ActiveModel {
370 project_id: ActiveValue::set(project_id),
371 worktree_id: ActiveValue::set(worktree_id),
372 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
373 scan_id: ActiveValue::set(update.scan_id as i64),
374 branch: ActiveValue::set(repository.branch.clone()),
375 is_deleted: ActiveValue::set(false),
376 },
377 ))
378 .on_conflict(
379 OnConflict::columns([
380 worktree_repository::Column::ProjectId,
381 worktree_repository::Column::WorktreeId,
382 worktree_repository::Column::WorkDirectoryId,
383 ])
384 .update_columns([
385 worktree_repository::Column::ScanId,
386 worktree_repository::Column::Branch,
387 ])
388 .to_owned(),
389 )
390 .exec(&*tx)
391 .await?;
392 }
393
394 if !update.removed_repositories.is_empty() {
395 worktree_repository::Entity::update_many()
396 .filter(
397 worktree_repository::Column::ProjectId
398 .eq(project_id)
399 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
400 .and(
401 worktree_repository::Column::WorkDirectoryId
402 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
403 ),
404 )
405 .set(worktree_repository::ActiveModel {
406 is_deleted: ActiveValue::Set(true),
407 scan_id: ActiveValue::Set(update.scan_id as i64),
408 ..Default::default()
409 })
410 .exec(&*tx)
411 .await?;
412 }
413
414 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
415 Ok(connection_ids)
416 })
417 .await
418 }
419
420 /// Updates the diagnostic summary for the given connection.
421 pub async fn update_diagnostic_summary(
422 &self,
423 update: &proto::UpdateDiagnosticSummary,
424 connection: ConnectionId,
425 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
426 let project_id = ProjectId::from_proto(update.project_id);
427 let worktree_id = update.worktree_id as i64;
428 self.project_transaction(project_id, |tx| async move {
429 let summary = update
430 .summary
431 .as_ref()
432 .ok_or_else(|| anyhow!("invalid summary"))?;
433
434 // Ensure the update comes from the host.
435 let project = project::Entity::find_by_id(project_id)
436 .one(&*tx)
437 .await?
438 .ok_or_else(|| anyhow!("no such project"))?;
439 if project.host_connection()? != connection {
440 return Err(anyhow!("can't update a project hosted by someone else"))?;
441 }
442
443 // Update summary.
444 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
445 project_id: ActiveValue::set(project_id),
446 worktree_id: ActiveValue::set(worktree_id),
447 path: ActiveValue::set(summary.path.clone()),
448 language_server_id: ActiveValue::set(summary.language_server_id as i64),
449 error_count: ActiveValue::set(summary.error_count as i32),
450 warning_count: ActiveValue::set(summary.warning_count as i32),
451 })
452 .on_conflict(
453 OnConflict::columns([
454 worktree_diagnostic_summary::Column::ProjectId,
455 worktree_diagnostic_summary::Column::WorktreeId,
456 worktree_diagnostic_summary::Column::Path,
457 ])
458 .update_columns([
459 worktree_diagnostic_summary::Column::LanguageServerId,
460 worktree_diagnostic_summary::Column::ErrorCount,
461 worktree_diagnostic_summary::Column::WarningCount,
462 ])
463 .to_owned(),
464 )
465 .exec(&*tx)
466 .await?;
467
468 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
469 Ok(connection_ids)
470 })
471 .await
472 }
473
474 /// Starts the language server for the given connection.
475 pub async fn start_language_server(
476 &self,
477 update: &proto::StartLanguageServer,
478 connection: ConnectionId,
479 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
480 let project_id = ProjectId::from_proto(update.project_id);
481 self.project_transaction(project_id, |tx| async move {
482 let server = update
483 .server
484 .as_ref()
485 .ok_or_else(|| anyhow!("invalid language server"))?;
486
487 // Ensure the update comes from the host.
488 let project = project::Entity::find_by_id(project_id)
489 .one(&*tx)
490 .await?
491 .ok_or_else(|| anyhow!("no such project"))?;
492 if project.host_connection()? != connection {
493 return Err(anyhow!("can't update a project hosted by someone else"))?;
494 }
495
496 // Add the newly-started language server.
497 language_server::Entity::insert(language_server::ActiveModel {
498 project_id: ActiveValue::set(project_id),
499 id: ActiveValue::set(server.id as i64),
500 name: ActiveValue::set(server.name.clone()),
501 })
502 .on_conflict(
503 OnConflict::columns([
504 language_server::Column::ProjectId,
505 language_server::Column::Id,
506 ])
507 .update_column(language_server::Column::Name)
508 .to_owned(),
509 )
510 .exec(&*tx)
511 .await?;
512
513 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
514 Ok(connection_ids)
515 })
516 .await
517 }
518
519 /// Updates the worktree settings for the given connection.
520 pub async fn update_worktree_settings(
521 &self,
522 update: &proto::UpdateWorktreeSettings,
523 connection: ConnectionId,
524 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
525 let project_id = ProjectId::from_proto(update.project_id);
526 self.project_transaction(project_id, |tx| async move {
527 // Ensure the update comes from the host.
528 let project = project::Entity::find_by_id(project_id)
529 .one(&*tx)
530 .await?
531 .ok_or_else(|| anyhow!("no such project"))?;
532 if project.host_connection()? != connection {
533 return Err(anyhow!("can't update a project hosted by someone else"))?;
534 }
535
536 if let Some(content) = &update.content {
537 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
538 project_id: ActiveValue::Set(project_id),
539 worktree_id: ActiveValue::Set(update.worktree_id as i64),
540 path: ActiveValue::Set(update.path.clone()),
541 content: ActiveValue::Set(content.clone()),
542 })
543 .on_conflict(
544 OnConflict::columns([
545 worktree_settings_file::Column::ProjectId,
546 worktree_settings_file::Column::WorktreeId,
547 worktree_settings_file::Column::Path,
548 ])
549 .update_column(worktree_settings_file::Column::Content)
550 .to_owned(),
551 )
552 .exec(&*tx)
553 .await?;
554 } else {
555 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
556 project_id: ActiveValue::Set(project_id),
557 worktree_id: ActiveValue::Set(update.worktree_id as i64),
558 path: ActiveValue::Set(update.path.clone()),
559 ..Default::default()
560 })
561 .exec(&*tx)
562 .await?;
563 }
564
565 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
566 Ok(connection_ids)
567 })
568 .await
569 }
570
571 /// Adds the given connection to the specified hosted project
572 pub async fn join_hosted_project(
573 &self,
574 id: ProjectId,
575 user_id: UserId,
576 connection: ConnectionId,
577 ) -> Result<(Project, ReplicaId)> {
578 self.transaction(|tx| async move {
579 let (project, hosted_project) = project::Entity::find_by_id(id)
580 .find_also_related(hosted_project::Entity)
581 .one(&*tx)
582 .await?
583 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
584
585 let Some(hosted_project) = hosted_project else {
586 return Err(anyhow!("project is not hosted"))?;
587 };
588
589 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
590 .one(&*tx)
591 .await?
592 .ok_or_else(|| anyhow!("no such channel"))?;
593
594 let role = self
595 .check_user_is_channel_participant(&channel, user_id, &tx)
596 .await?;
597
598 self.join_project_internal(project, user_id, connection, role, &tx)
599 .await
600 })
601 .await
602 }
603
604 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
605 self.transaction(|tx| async move {
606 Ok(project::Entity::find_by_id(id)
607 .one(&*tx)
608 .await?
609 .ok_or_else(|| anyhow!("no such project"))?)
610 })
611 .await
612 }
613
614 pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
615 self.transaction(|tx| async move {
616 Ok(project::Entity::find()
617 .filter(project::Column::DevServerProjectId.eq(id))
618 .one(&*tx)
619 .await?
620 .ok_or_else(|| anyhow!("no such project"))?)
621 })
622 .await
623 }
624
625 /// Adds the given connection to the specified project
626 /// in the current room.
627 pub async fn join_project(
628 &self,
629 project_id: ProjectId,
630 connection: ConnectionId,
631 user_id: UserId,
632 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
633 self.project_transaction(project_id, |tx| async move {
634 let (project, role) = self
635 .access_project(
636 project_id,
637 connection,
638 PrincipalId::UserId(user_id),
639 Capability::ReadOnly,
640 &tx,
641 )
642 .await?;
643 self.join_project_internal(project, user_id, connection, role, &tx)
644 .await
645 })
646 .await
647 }
648
649 async fn join_project_internal(
650 &self,
651 project: project::Model,
652 user_id: UserId,
653 connection: ConnectionId,
654 role: ChannelRole,
655 tx: &DatabaseTransaction,
656 ) -> Result<(Project, ReplicaId)> {
657 let mut collaborators = project
658 .find_related(project_collaborator::Entity)
659 .all(tx)
660 .await?;
661 let replica_ids = collaborators
662 .iter()
663 .map(|c| c.replica_id)
664 .collect::<HashSet<_>>();
665 let mut replica_id = ReplicaId(1);
666 while replica_ids.contains(&replica_id) {
667 replica_id.0 += 1;
668 }
669 let new_collaborator = project_collaborator::ActiveModel {
670 project_id: ActiveValue::set(project.id),
671 connection_id: ActiveValue::set(connection.id as i32),
672 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
673 user_id: ActiveValue::set(user_id),
674 replica_id: ActiveValue::set(replica_id),
675 is_host: ActiveValue::set(false),
676 ..Default::default()
677 }
678 .insert(tx)
679 .await?;
680 collaborators.push(new_collaborator);
681
682 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
683 let mut worktrees = db_worktrees
684 .into_iter()
685 .map(|db_worktree| {
686 (
687 db_worktree.id as u64,
688 Worktree {
689 id: db_worktree.id as u64,
690 abs_path: db_worktree.abs_path,
691 root_name: db_worktree.root_name,
692 visible: db_worktree.visible,
693 entries: Default::default(),
694 repository_entries: Default::default(),
695 diagnostic_summaries: Default::default(),
696 settings_files: Default::default(),
697 scan_id: db_worktree.scan_id as u64,
698 completed_scan_id: db_worktree.completed_scan_id as u64,
699 },
700 )
701 })
702 .collect::<BTreeMap<_, _>>();
703
704 // Populate worktree entries.
705 {
706 let mut db_entries = worktree_entry::Entity::find()
707 .filter(
708 Condition::all()
709 .add(worktree_entry::Column::ProjectId.eq(project.id))
710 .add(worktree_entry::Column::IsDeleted.eq(false)),
711 )
712 .stream(tx)
713 .await?;
714 while let Some(db_entry) = db_entries.next().await {
715 let db_entry = db_entry?;
716 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
717 worktree.entries.push(proto::Entry {
718 id: db_entry.id as u64,
719 is_dir: db_entry.is_dir,
720 path: db_entry.path,
721 inode: db_entry.inode as u64,
722 mtime: Some(proto::Timestamp {
723 seconds: db_entry.mtime_seconds as u64,
724 nanos: db_entry.mtime_nanos as u32,
725 }),
726 is_symlink: db_entry.is_symlink,
727 is_ignored: db_entry.is_ignored,
728 is_external: db_entry.is_external,
729 git_status: db_entry.git_status.map(|status| status as i32),
730 });
731 }
732 }
733 }
734
735 // Populate repository entries.
736 {
737 let mut db_repository_entries = worktree_repository::Entity::find()
738 .filter(
739 Condition::all()
740 .add(worktree_repository::Column::ProjectId.eq(project.id))
741 .add(worktree_repository::Column::IsDeleted.eq(false)),
742 )
743 .stream(tx)
744 .await?;
745 while let Some(db_repository_entry) = db_repository_entries.next().await {
746 let db_repository_entry = db_repository_entry?;
747 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
748 {
749 worktree.repository_entries.insert(
750 db_repository_entry.work_directory_id as u64,
751 proto::RepositoryEntry {
752 work_directory_id: db_repository_entry.work_directory_id as u64,
753 branch: db_repository_entry.branch,
754 },
755 );
756 }
757 }
758 }
759
760 // Populate worktree diagnostic summaries.
761 {
762 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
763 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
764 .stream(tx)
765 .await?;
766 while let Some(db_summary) = db_summaries.next().await {
767 let db_summary = db_summary?;
768 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
769 worktree
770 .diagnostic_summaries
771 .push(proto::DiagnosticSummary {
772 path: db_summary.path,
773 language_server_id: db_summary.language_server_id as u64,
774 error_count: db_summary.error_count as u32,
775 warning_count: db_summary.warning_count as u32,
776 });
777 }
778 }
779 }
780
781 // Populate worktree settings files
782 {
783 let mut db_settings_files = worktree_settings_file::Entity::find()
784 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
785 .stream(tx)
786 .await?;
787 while let Some(db_settings_file) = db_settings_files.next().await {
788 let db_settings_file = db_settings_file?;
789 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
790 worktree.settings_files.push(WorktreeSettingsFile {
791 path: db_settings_file.path,
792 content: db_settings_file.content,
793 });
794 }
795 }
796 }
797
798 // Populate language servers.
799 let language_servers = project
800 .find_related(language_server::Entity)
801 .all(tx)
802 .await?;
803
804 let project = Project {
805 id: project.id,
806 role,
807 collaborators: collaborators
808 .into_iter()
809 .map(|collaborator| ProjectCollaborator {
810 connection_id: collaborator.connection(),
811 user_id: collaborator.user_id,
812 replica_id: collaborator.replica_id,
813 is_host: collaborator.is_host,
814 })
815 .collect(),
816 worktrees,
817 language_servers: language_servers
818 .into_iter()
819 .map(|language_server| proto::LanguageServer {
820 id: language_server.id as u64,
821 name: language_server.name,
822 })
823 .collect(),
824 dev_server_project_id: project.dev_server_project_id,
825 };
826 Ok((project, replica_id as ReplicaId))
827 }
828
829 pub async fn leave_hosted_project(
830 &self,
831 project_id: ProjectId,
832 connection: ConnectionId,
833 ) -> Result<LeftProject> {
834 self.transaction(|tx| async move {
835 let result = project_collaborator::Entity::delete_many()
836 .filter(
837 Condition::all()
838 .add(project_collaborator::Column::ProjectId.eq(project_id))
839 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
840 .add(
841 project_collaborator::Column::ConnectionServerId
842 .eq(connection.owner_id as i32),
843 ),
844 )
845 .exec(&*tx)
846 .await?;
847 if result.rows_affected == 0 {
848 return Err(anyhow!("not in the project"))?;
849 }
850
851 let project = project::Entity::find_by_id(project_id)
852 .one(&*tx)
853 .await?
854 .ok_or_else(|| anyhow!("no such project"))?;
855 let collaborators = project
856 .find_related(project_collaborator::Entity)
857 .all(&*tx)
858 .await?;
859 let connection_ids = collaborators
860 .into_iter()
861 .map(|collaborator| collaborator.connection())
862 .collect();
863 Ok(LeftProject {
864 id: project.id,
865 connection_ids,
866 should_unshare: false,
867 })
868 })
869 .await
870 }
871
872 /// Removes the given connection from the specified project.
873 pub async fn leave_project(
874 &self,
875 project_id: ProjectId,
876 connection: ConnectionId,
877 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
878 self.project_transaction(project_id, |tx| async move {
879 let result = project_collaborator::Entity::delete_many()
880 .filter(
881 Condition::all()
882 .add(project_collaborator::Column::ProjectId.eq(project_id))
883 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
884 .add(
885 project_collaborator::Column::ConnectionServerId
886 .eq(connection.owner_id as i32),
887 ),
888 )
889 .exec(&*tx)
890 .await?;
891 if result.rows_affected == 0 {
892 Err(anyhow!("not a collaborator on this project"))?;
893 }
894
895 let project = project::Entity::find_by_id(project_id)
896 .one(&*tx)
897 .await?
898 .ok_or_else(|| anyhow!("no such project"))?;
899 let collaborators = project
900 .find_related(project_collaborator::Entity)
901 .all(&*tx)
902 .await?;
903 let connection_ids: Vec<ConnectionId> = collaborators
904 .into_iter()
905 .map(|collaborator| collaborator.connection())
906 .collect();
907
908 follower::Entity::delete_many()
909 .filter(
910 Condition::any()
911 .add(
912 Condition::all()
913 .add(follower::Column::ProjectId.eq(Some(project_id)))
914 .add(
915 follower::Column::LeaderConnectionServerId
916 .eq(connection.owner_id),
917 )
918 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
919 )
920 .add(
921 Condition::all()
922 .add(follower::Column::ProjectId.eq(Some(project_id)))
923 .add(
924 follower::Column::FollowerConnectionServerId
925 .eq(connection.owner_id),
926 )
927 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
928 ),
929 )
930 .exec(&*tx)
931 .await?;
932
933 let room = if let Some(room_id) = project.room_id {
934 Some(self.get_room(room_id, &tx).await?)
935 } else {
936 None
937 };
938
939 let left_project = LeftProject {
940 id: project_id,
941 should_unshare: connection == project.host_connection()?,
942 connection_ids,
943 };
944 Ok((room, left_project))
945 })
946 .await
947 }
948
949 pub async fn check_user_is_project_host(
950 &self,
951 project_id: ProjectId,
952 connection_id: ConnectionId,
953 ) -> Result<()> {
954 self.project_transaction(project_id, |tx| async move {
955 project::Entity::find()
956 .filter(
957 Condition::all()
958 .add(project::Column::Id.eq(project_id))
959 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
960 .add(
961 project::Column::HostConnectionServerId
962 .eq(Some(connection_id.owner_id as i32)),
963 ),
964 )
965 .one(&*tx)
966 .await?
967 .ok_or_else(|| anyhow!("failed to read project host"))?;
968
969 Ok(())
970 })
971 .await
972 .map(|guard| guard.into_inner())
973 }
974
975 /// Returns the current project if the given user is authorized to access it with the specified capability.
976 pub async fn access_project(
977 &self,
978 project_id: ProjectId,
979 connection_id: ConnectionId,
980 principal_id: PrincipalId,
981 capability: Capability,
982 tx: &DatabaseTransaction,
983 ) -> Result<(project::Model, ChannelRole)> {
984 let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
985 .find_also_related(dev_server_project::Entity)
986 .one(tx)
987 .await?
988 .ok_or_else(|| anyhow!("no such project"))?;
989
990 let user_id = match principal_id {
991 PrincipalId::DevServerId(_) => {
992 if project
993 .host_connection()
994 .is_ok_and(|connection| connection == connection_id)
995 {
996 return Ok((project, ChannelRole::Admin));
997 }
998 return Err(anyhow!("not the project host"))?;
999 }
1000 PrincipalId::UserId(user_id) => user_id,
1001 };
1002
1003 let role_from_room = if let Some(room_id) = project.room_id {
1004 room_participant::Entity::find()
1005 .filter(room_participant::Column::RoomId.eq(room_id))
1006 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1007 .one(tx)
1008 .await?
1009 .and_then(|participant| participant.role)
1010 } else {
1011 None
1012 };
1013 let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1014 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1015 .one(tx)
1016 .await?
1017 .ok_or_else(|| anyhow!("no such channel"))?;
1018 if user_id == dev_server.user_id {
1019 // If the user left the room "uncleanly" they may rejoin the
1020 // remote project before leave_room runs. IN that case kick
1021 // the project out of the room pre-emptively.
1022 if role_from_room.is_none() {
1023 project = project::Entity::update(project::ActiveModel {
1024 room_id: ActiveValue::Set(None),
1025 ..project.into_active_model()
1026 })
1027 .exec(tx)
1028 .await?;
1029 }
1030 Some(ChannelRole::Admin)
1031 } else {
1032 None
1033 }
1034 } else {
1035 None
1036 };
1037
1038 let role = role_from_dev_server
1039 .or(role_from_room)
1040 .unwrap_or(ChannelRole::Banned);
1041
1042 match capability {
1043 Capability::ReadWrite => {
1044 if !role.can_edit_projects() {
1045 return Err(anyhow!("not authorized to edit projects"))?;
1046 }
1047 }
1048 Capability::ReadOnly => {
1049 if !role.can_read_projects() {
1050 return Err(anyhow!("not authorized to read projects"))?;
1051 }
1052 }
1053 }
1054
1055 Ok((project, role))
1056 }
1057
1058 /// Returns the host connection for a read-only request to join a shared project.
1059 pub async fn host_for_read_only_project_request(
1060 &self,
1061 project_id: ProjectId,
1062 connection_id: ConnectionId,
1063 user_id: UserId,
1064 ) -> Result<ConnectionId> {
1065 self.project_transaction(project_id, |tx| async move {
1066 let (project, _) = self
1067 .access_project(
1068 project_id,
1069 connection_id,
1070 PrincipalId::UserId(user_id),
1071 Capability::ReadOnly,
1072 &tx,
1073 )
1074 .await?;
1075 project.host_connection()
1076 })
1077 .await
1078 .map(|guard| guard.into_inner())
1079 }
1080
1081 /// Returns the host connection for a request to join a shared project.
1082 pub async fn host_for_mutating_project_request(
1083 &self,
1084 project_id: ProjectId,
1085 connection_id: ConnectionId,
1086 user_id: UserId,
1087 ) -> Result<ConnectionId> {
1088 self.project_transaction(project_id, |tx| async move {
1089 let (project, _) = self
1090 .access_project(
1091 project_id,
1092 connection_id,
1093 PrincipalId::UserId(user_id),
1094 Capability::ReadWrite,
1095 &tx,
1096 )
1097 .await?;
1098 project.host_connection()
1099 })
1100 .await
1101 .map(|guard| guard.into_inner())
1102 }
1103
1104 /// Returns the host connection for a request to join a shared project.
1105 pub async fn host_for_owner_project_request(
1106 &self,
1107 project_id: ProjectId,
1108 _connection_id: ConnectionId,
1109 user_id: UserId,
1110 ) -> Result<ConnectionId> {
1111 self.project_transaction(project_id, |tx| async move {
1112 let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1113 .find_also_related(dev_server_project::Entity)
1114 .one(&*tx)
1115 .await?
1116 .ok_or_else(|| anyhow!("no such project"))?;
1117
1118 let Some(dev_server_project) = dev_server_project else {
1119 return Err(anyhow!("not a dev server project"))?;
1120 };
1121 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1122 .one(&*tx)
1123 .await?
1124 .ok_or_else(|| anyhow!("no such dev server"))?;
1125 if dev_server.user_id != user_id {
1126 return Err(anyhow!("not your project"))?;
1127 }
1128 project.host_connection()
1129 })
1130 .await
1131 .map(|guard| guard.into_inner())
1132 }
1133
1134 pub async fn connections_for_buffer_update(
1135 &self,
1136 project_id: ProjectId,
1137 principal_id: PrincipalId,
1138 connection_id: ConnectionId,
1139 capability: Capability,
1140 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1141 self.project_transaction(project_id, |tx| async move {
1142 // Authorize
1143 let (project, _) = self
1144 .access_project(project_id, connection_id, principal_id, capability, &tx)
1145 .await?;
1146
1147 let host_connection_id = project.host_connection()?;
1148
1149 let collaborators = project_collaborator::Entity::find()
1150 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1151 .all(&*tx)
1152 .await?;
1153
1154 let guest_connection_ids = collaborators
1155 .into_iter()
1156 .filter_map(|collaborator| {
1157 if collaborator.is_host {
1158 None
1159 } else {
1160 Some(collaborator.connection())
1161 }
1162 })
1163 .collect();
1164
1165 Ok((host_connection_id, guest_connection_ids))
1166 })
1167 .await
1168 }
1169
1170 /// Returns the connection IDs in the given project.
1171 ///
1172 /// The provided `connection_id` must also be a collaborator in the project,
1173 /// otherwise an error will be returned.
1174 pub async fn project_connection_ids(
1175 &self,
1176 project_id: ProjectId,
1177 connection_id: ConnectionId,
1178 exclude_dev_server: bool,
1179 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1180 self.project_transaction(project_id, |tx| async move {
1181 let project = project::Entity::find_by_id(project_id)
1182 .one(&*tx)
1183 .await?
1184 .ok_or_else(|| anyhow!("no such project"))?;
1185
1186 let mut collaborators = project_collaborator::Entity::find()
1187 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1188 .stream(&*tx)
1189 .await?;
1190
1191 let mut connection_ids = HashSet::default();
1192 if let Some(host_connection) = project.host_connection().log_err() {
1193 if !exclude_dev_server {
1194 connection_ids.insert(host_connection);
1195 }
1196 }
1197
1198 while let Some(collaborator) = collaborators.next().await {
1199 let collaborator = collaborator?;
1200 connection_ids.insert(collaborator.connection());
1201 }
1202
1203 if connection_ids.contains(&connection_id)
1204 || Some(connection_id) == project.host_connection().ok()
1205 {
1206 Ok(connection_ids)
1207 } else {
1208 Err(anyhow!(
1209 "can only send project updates to a project you're in"
1210 ))?
1211 }
1212 })
1213 .await
1214 }
1215
1216 async fn project_guest_connection_ids(
1217 &self,
1218 project_id: ProjectId,
1219 tx: &DatabaseTransaction,
1220 ) -> Result<Vec<ConnectionId>> {
1221 let mut collaborators = project_collaborator::Entity::find()
1222 .filter(
1223 project_collaborator::Column::ProjectId
1224 .eq(project_id)
1225 .and(project_collaborator::Column::IsHost.eq(false)),
1226 )
1227 .stream(tx)
1228 .await?;
1229
1230 let mut guest_connection_ids = Vec::new();
1231 while let Some(collaborator) = collaborators.next().await {
1232 let collaborator = collaborator?;
1233 guest_connection_ids.push(collaborator.connection());
1234 }
1235 Ok(guest_connection_ids)
1236 }
1237
1238 /// Returns the [`RoomId`] for the given project.
1239 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1240 self.transaction(|tx| async move {
1241 Ok(project::Entity::find_by_id(project_id)
1242 .one(&*tx)
1243 .await?
1244 .and_then(|project| project.room_id))
1245 })
1246 .await
1247 }
1248
1249 pub async fn check_room_participants(
1250 &self,
1251 room_id: RoomId,
1252 leader_id: ConnectionId,
1253 follower_id: ConnectionId,
1254 ) -> Result<()> {
1255 self.transaction(|tx| async move {
1256 use room_participant::Column;
1257
1258 let count = room_participant::Entity::find()
1259 .filter(
1260 Condition::all().add(Column::RoomId.eq(room_id)).add(
1261 Condition::any()
1262 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1263 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1264 ))
1265 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1266 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1267 )),
1268 ),
1269 )
1270 .count(&*tx)
1271 .await?;
1272
1273 if count < 2 {
1274 Err(anyhow!("not room participants"))?;
1275 }
1276
1277 Ok(())
1278 })
1279 .await
1280 }
1281
1282 /// Adds the given follower connection as a follower of the given leader connection.
1283 pub async fn follow(
1284 &self,
1285 room_id: RoomId,
1286 project_id: ProjectId,
1287 leader_connection: ConnectionId,
1288 follower_connection: ConnectionId,
1289 ) -> Result<TransactionGuard<proto::Room>> {
1290 self.room_transaction(room_id, |tx| async move {
1291 follower::ActiveModel {
1292 room_id: ActiveValue::set(room_id),
1293 project_id: ActiveValue::set(project_id),
1294 leader_connection_server_id: ActiveValue::set(ServerId(
1295 leader_connection.owner_id as i32,
1296 )),
1297 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1298 follower_connection_server_id: ActiveValue::set(ServerId(
1299 follower_connection.owner_id as i32,
1300 )),
1301 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1302 ..Default::default()
1303 }
1304 .insert(&*tx)
1305 .await?;
1306
1307 let room = self.get_room(room_id, &tx).await?;
1308 Ok(room)
1309 })
1310 .await
1311 }
1312
1313 /// Removes the given follower connection as a follower of the given leader connection.
1314 pub async fn unfollow(
1315 &self,
1316 room_id: RoomId,
1317 project_id: ProjectId,
1318 leader_connection: ConnectionId,
1319 follower_connection: ConnectionId,
1320 ) -> Result<TransactionGuard<proto::Room>> {
1321 self.room_transaction(room_id, |tx| async move {
1322 follower::Entity::delete_many()
1323 .filter(
1324 Condition::all()
1325 .add(follower::Column::RoomId.eq(room_id))
1326 .add(follower::Column::ProjectId.eq(project_id))
1327 .add(
1328 follower::Column::LeaderConnectionServerId
1329 .eq(leader_connection.owner_id),
1330 )
1331 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1332 .add(
1333 follower::Column::FollowerConnectionServerId
1334 .eq(follower_connection.owner_id),
1335 )
1336 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1337 )
1338 .exec(&*tx)
1339 .await?;
1340
1341 let room = self.get_room(room_id, &tx).await?;
1342 Ok(room)
1343 })
1344 .await
1345 }
1346}