1use util::ResultExt;
2
3use super::*;
4
5impl Database {
6 /// Returns the count of all projects, excluding ones marked as admin.
7 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
8 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
9 enum QueryAs {
10 Count,
11 }
12
13 self.transaction(|tx| async move {
14 Ok(project::Entity::find()
15 .select_only()
16 .column_as(project::Column::Id.count(), QueryAs::Count)
17 .inner_join(user::Entity)
18 .filter(user::Column::Admin.eq(false))
19 .into_values::<_, QueryAs>()
20 .one(&*tx)
21 .await?
22 .unwrap_or(0i64) as usize)
23 })
24 .await
25 }
26
27 /// Shares a project with the given room.
28 pub async fn share_project(
29 &self,
30 room_id: RoomId,
31 connection: ConnectionId,
32 worktrees: &[proto::WorktreeMetadata],
33 is_ssh_project: bool,
34 dev_server_project_id: Option<DevServerProjectId>,
35 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
36 self.room_transaction(room_id, |tx| async move {
37 let participant = room_participant::Entity::find()
38 .filter(
39 Condition::all()
40 .add(
41 room_participant::Column::AnsweringConnectionId
42 .eq(connection.id as i32),
43 )
44 .add(
45 room_participant::Column::AnsweringConnectionServerId
46 .eq(connection.owner_id as i32),
47 ),
48 )
49 .one(&*tx)
50 .await?
51 .ok_or_else(|| anyhow!("could not find participant"))?;
52 if participant.room_id != room_id {
53 return Err(anyhow!("shared project on unexpected room"))?;
54 }
55 if !participant
56 .role
57 .unwrap_or(ChannelRole::Member)
58 .can_edit_projects()
59 {
60 return Err(anyhow!("guests cannot share projects"))?;
61 }
62
63 if let Some(dev_server_project_id) = dev_server_project_id {
64 let project = project::Entity::find()
65 .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
66 .one(&*tx)
67 .await?
68 .ok_or_else(|| anyhow!("no remote project"))?;
69
70 let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
71 .find_also_related(dev_server::Entity)
72 .one(&*tx)
73 .await?
74 .ok_or_else(|| anyhow!("no dev_server_project"))?;
75
76 if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
77 return Err(anyhow!("not your dev server"))?;
78 }
79
80 if project.room_id.is_some() {
81 return Err(anyhow!("project already shared"))?;
82 };
83
84 let project = project::Entity::update(project::ActiveModel {
85 room_id: ActiveValue::Set(Some(room_id)),
86 ..project.into_active_model()
87 })
88 .exec(&*tx)
89 .await?;
90
91 let room = self.get_room(room_id, &tx).await?;
92 return Ok((project.id, room));
93 }
94
95 let project = project::ActiveModel {
96 room_id: ActiveValue::set(Some(participant.room_id)),
97 host_user_id: ActiveValue::set(Some(participant.user_id)),
98 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
99 host_connection_server_id: ActiveValue::set(Some(ServerId(
100 connection.owner_id as i32,
101 ))),
102 id: ActiveValue::NotSet,
103 hosted_project_id: ActiveValue::Set(None),
104 dev_server_project_id: ActiveValue::Set(None),
105 }
106 .insert(&*tx)
107 .await?;
108
109 if !worktrees.is_empty() {
110 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
111 worktree::ActiveModel {
112 id: ActiveValue::set(worktree.id as i64),
113 project_id: ActiveValue::set(project.id),
114 abs_path: ActiveValue::set(worktree.abs_path.clone()),
115 root_name: ActiveValue::set(worktree.root_name.clone()),
116 visible: ActiveValue::set(worktree.visible),
117 scan_id: ActiveValue::set(0),
118 completed_scan_id: ActiveValue::set(0),
119 }
120 }))
121 .exec(&*tx)
122 .await?;
123 }
124
125 let replica_id = if is_ssh_project { 1 } else { 0 };
126
127 project_collaborator::ActiveModel {
128 project_id: ActiveValue::set(project.id),
129 connection_id: ActiveValue::set(connection.id as i32),
130 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
131 user_id: ActiveValue::set(participant.user_id),
132 replica_id: ActiveValue::set(ReplicaId(replica_id)),
133 is_host: ActiveValue::set(true),
134 ..Default::default()
135 }
136 .insert(&*tx)
137 .await?;
138
139 let room = self.get_room(room_id, &tx).await?;
140 Ok((project.id, room))
141 })
142 .await
143 }
144
145 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
146 self.weak_transaction(|tx| async move {
147 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
148 Ok(())
149 })
150 .await
151 }
152
153 /// Unshares the given project.
154 pub async fn unshare_project(
155 &self,
156 project_id: ProjectId,
157 connection: ConnectionId,
158 user_id: Option<UserId>,
159 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
160 self.project_transaction(project_id, |tx| async move {
161 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
162 let project = project::Entity::find_by_id(project_id)
163 .one(&*tx)
164 .await?
165 .ok_or_else(|| anyhow!("project not found"))?;
166 let room = if let Some(room_id) = project.room_id {
167 Some(self.get_room(room_id, &tx).await?)
168 } else {
169 None
170 };
171 if project.host_connection()? == connection {
172 return Ok((true, room, guest_connection_ids));
173 }
174 if let Some(dev_server_project_id) = project.dev_server_project_id {
175 if let Some(user_id) = user_id {
176 if user_id
177 != self
178 .owner_for_dev_server_project(dev_server_project_id, &tx)
179 .await?
180 {
181 Err(anyhow!("cannot unshare a project hosted by another user"))?
182 }
183 project::Entity::update(project::ActiveModel {
184 room_id: ActiveValue::Set(None),
185 ..project.into_active_model()
186 })
187 .exec(&*tx)
188 .await?;
189 return Ok((false, room, guest_connection_ids));
190 }
191 }
192
193 Err(anyhow!("cannot unshare a project hosted by another user"))?
194 })
195 .await
196 }
197
198 /// Updates the worktrees associated with the given project.
199 pub async fn update_project(
200 &self,
201 project_id: ProjectId,
202 connection: ConnectionId,
203 worktrees: &[proto::WorktreeMetadata],
204 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
205 self.project_transaction(project_id, |tx| async move {
206 let project = project::Entity::find_by_id(project_id)
207 .filter(
208 Condition::all()
209 .add(project::Column::HostConnectionId.eq(connection.id as i32))
210 .add(
211 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
212 ),
213 )
214 .one(&*tx)
215 .await?
216 .ok_or_else(|| anyhow!("no such project"))?;
217
218 self.update_project_worktrees(project.id, worktrees, &tx)
219 .await?;
220
221 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
222
223 let room = if let Some(room_id) = project.room_id {
224 Some(self.get_room(room_id, &tx).await?)
225 } else {
226 None
227 };
228
229 Ok((room, guest_connection_ids))
230 })
231 .await
232 }
233
234 pub(in crate::db) async fn update_project_worktrees(
235 &self,
236 project_id: ProjectId,
237 worktrees: &[proto::WorktreeMetadata],
238 tx: &DatabaseTransaction,
239 ) -> Result<()> {
240 if !worktrees.is_empty() {
241 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
242 id: ActiveValue::set(worktree.id as i64),
243 project_id: ActiveValue::set(project_id),
244 abs_path: ActiveValue::set(worktree.abs_path.clone()),
245 root_name: ActiveValue::set(worktree.root_name.clone()),
246 visible: ActiveValue::set(worktree.visible),
247 scan_id: ActiveValue::set(0),
248 completed_scan_id: ActiveValue::set(0),
249 }))
250 .on_conflict(
251 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
252 .update_column(worktree::Column::RootName)
253 .to_owned(),
254 )
255 .exec(tx)
256 .await?;
257 }
258
259 worktree::Entity::delete_many()
260 .filter(worktree::Column::ProjectId.eq(project_id).and(
261 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
262 ))
263 .exec(tx)
264 .await?;
265
266 Ok(())
267 }
268
269 pub async fn update_worktree(
270 &self,
271 update: &proto::UpdateWorktree,
272 connection: ConnectionId,
273 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
274 let project_id = ProjectId::from_proto(update.project_id);
275 let worktree_id = update.worktree_id as i64;
276 self.project_transaction(project_id, |tx| async move {
277 // Ensure the update comes from the host.
278 let _project = project::Entity::find_by_id(project_id)
279 .filter(
280 Condition::all()
281 .add(project::Column::HostConnectionId.eq(connection.id as i32))
282 .add(
283 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
284 ),
285 )
286 .one(&*tx)
287 .await?
288 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
289
290 // Update metadata.
291 worktree::Entity::update(worktree::ActiveModel {
292 id: ActiveValue::set(worktree_id),
293 project_id: ActiveValue::set(project_id),
294 root_name: ActiveValue::set(update.root_name.clone()),
295 scan_id: ActiveValue::set(update.scan_id as i64),
296 completed_scan_id: if update.is_last_update {
297 ActiveValue::set(update.scan_id as i64)
298 } else {
299 ActiveValue::default()
300 },
301 abs_path: ActiveValue::set(update.abs_path.clone()),
302 ..Default::default()
303 })
304 .exec(&*tx)
305 .await?;
306
307 if !update.updated_entries.is_empty() {
308 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
309 let mtime = entry.mtime.clone().unwrap_or_default();
310 worktree_entry::ActiveModel {
311 project_id: ActiveValue::set(project_id),
312 worktree_id: ActiveValue::set(worktree_id),
313 id: ActiveValue::set(entry.id as i64),
314 is_dir: ActiveValue::set(entry.is_dir),
315 path: ActiveValue::set(entry.path.clone()),
316 inode: ActiveValue::set(entry.inode as i64),
317 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
318 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
319 is_symlink: ActiveValue::set(entry.is_symlink),
320 is_ignored: ActiveValue::set(entry.is_ignored),
321 is_external: ActiveValue::set(entry.is_external),
322 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
323 is_deleted: ActiveValue::set(false),
324 scan_id: ActiveValue::set(update.scan_id as i64),
325 is_fifo: ActiveValue::set(entry.is_fifo),
326 }
327 }))
328 .on_conflict(
329 OnConflict::columns([
330 worktree_entry::Column::ProjectId,
331 worktree_entry::Column::WorktreeId,
332 worktree_entry::Column::Id,
333 ])
334 .update_columns([
335 worktree_entry::Column::IsDir,
336 worktree_entry::Column::Path,
337 worktree_entry::Column::Inode,
338 worktree_entry::Column::MtimeSeconds,
339 worktree_entry::Column::MtimeNanos,
340 worktree_entry::Column::IsSymlink,
341 worktree_entry::Column::IsIgnored,
342 worktree_entry::Column::GitStatus,
343 worktree_entry::Column::ScanId,
344 ])
345 .to_owned(),
346 )
347 .exec(&*tx)
348 .await?;
349 }
350
351 if !update.removed_entries.is_empty() {
352 worktree_entry::Entity::update_many()
353 .filter(
354 worktree_entry::Column::ProjectId
355 .eq(project_id)
356 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
357 .and(
358 worktree_entry::Column::Id
359 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
360 ),
361 )
362 .set(worktree_entry::ActiveModel {
363 is_deleted: ActiveValue::Set(true),
364 scan_id: ActiveValue::Set(update.scan_id as i64),
365 ..Default::default()
366 })
367 .exec(&*tx)
368 .await?;
369 }
370
371 if !update.updated_repositories.is_empty() {
372 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
373 |repository| worktree_repository::ActiveModel {
374 project_id: ActiveValue::set(project_id),
375 worktree_id: ActiveValue::set(worktree_id),
376 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
377 scan_id: ActiveValue::set(update.scan_id as i64),
378 branch: ActiveValue::set(repository.branch.clone()),
379 is_deleted: ActiveValue::set(false),
380 },
381 ))
382 .on_conflict(
383 OnConflict::columns([
384 worktree_repository::Column::ProjectId,
385 worktree_repository::Column::WorktreeId,
386 worktree_repository::Column::WorkDirectoryId,
387 ])
388 .update_columns([
389 worktree_repository::Column::ScanId,
390 worktree_repository::Column::Branch,
391 ])
392 .to_owned(),
393 )
394 .exec(&*tx)
395 .await?;
396 }
397
398 if !update.removed_repositories.is_empty() {
399 worktree_repository::Entity::update_many()
400 .filter(
401 worktree_repository::Column::ProjectId
402 .eq(project_id)
403 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
404 .and(
405 worktree_repository::Column::WorkDirectoryId
406 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
407 ),
408 )
409 .set(worktree_repository::ActiveModel {
410 is_deleted: ActiveValue::Set(true),
411 scan_id: ActiveValue::Set(update.scan_id as i64),
412 ..Default::default()
413 })
414 .exec(&*tx)
415 .await?;
416 }
417
418 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
419 Ok(connection_ids)
420 })
421 .await
422 }
423
424 /// Updates the diagnostic summary for the given connection.
425 pub async fn update_diagnostic_summary(
426 &self,
427 update: &proto::UpdateDiagnosticSummary,
428 connection: ConnectionId,
429 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
430 let project_id = ProjectId::from_proto(update.project_id);
431 let worktree_id = update.worktree_id as i64;
432 self.project_transaction(project_id, |tx| async move {
433 let summary = update
434 .summary
435 .as_ref()
436 .ok_or_else(|| anyhow!("invalid summary"))?;
437
438 // Ensure the update comes from the host.
439 let project = project::Entity::find_by_id(project_id)
440 .one(&*tx)
441 .await?
442 .ok_or_else(|| anyhow!("no such project"))?;
443 if project.host_connection()? != connection {
444 return Err(anyhow!("can't update a project hosted by someone else"))?;
445 }
446
447 // Update summary.
448 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
449 project_id: ActiveValue::set(project_id),
450 worktree_id: ActiveValue::set(worktree_id),
451 path: ActiveValue::set(summary.path.clone()),
452 language_server_id: ActiveValue::set(summary.language_server_id as i64),
453 error_count: ActiveValue::set(summary.error_count as i32),
454 warning_count: ActiveValue::set(summary.warning_count as i32),
455 })
456 .on_conflict(
457 OnConflict::columns([
458 worktree_diagnostic_summary::Column::ProjectId,
459 worktree_diagnostic_summary::Column::WorktreeId,
460 worktree_diagnostic_summary::Column::Path,
461 ])
462 .update_columns([
463 worktree_diagnostic_summary::Column::LanguageServerId,
464 worktree_diagnostic_summary::Column::ErrorCount,
465 worktree_diagnostic_summary::Column::WarningCount,
466 ])
467 .to_owned(),
468 )
469 .exec(&*tx)
470 .await?;
471
472 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
473 Ok(connection_ids)
474 })
475 .await
476 }
477
478 /// Starts the language server for the given connection.
479 pub async fn start_language_server(
480 &self,
481 update: &proto::StartLanguageServer,
482 connection: ConnectionId,
483 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
484 let project_id = ProjectId::from_proto(update.project_id);
485 self.project_transaction(project_id, |tx| async move {
486 let server = update
487 .server
488 .as_ref()
489 .ok_or_else(|| anyhow!("invalid language server"))?;
490
491 // Ensure the update comes from the host.
492 let project = project::Entity::find_by_id(project_id)
493 .one(&*tx)
494 .await?
495 .ok_or_else(|| anyhow!("no such project"))?;
496 if project.host_connection()? != connection {
497 return Err(anyhow!("can't update a project hosted by someone else"))?;
498 }
499
500 // Add the newly-started language server.
501 language_server::Entity::insert(language_server::ActiveModel {
502 project_id: ActiveValue::set(project_id),
503 id: ActiveValue::set(server.id as i64),
504 name: ActiveValue::set(server.name.clone()),
505 })
506 .on_conflict(
507 OnConflict::columns([
508 language_server::Column::ProjectId,
509 language_server::Column::Id,
510 ])
511 .update_column(language_server::Column::Name)
512 .to_owned(),
513 )
514 .exec(&*tx)
515 .await?;
516
517 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
518 Ok(connection_ids)
519 })
520 .await
521 }
522
523 /// Updates the worktree settings for the given connection.
524 pub async fn update_worktree_settings(
525 &self,
526 update: &proto::UpdateWorktreeSettings,
527 connection: ConnectionId,
528 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
529 let project_id = ProjectId::from_proto(update.project_id);
530 self.project_transaction(project_id, |tx| async move {
531 // Ensure the update comes from the host.
532 let project = project::Entity::find_by_id(project_id)
533 .one(&*tx)
534 .await?
535 .ok_or_else(|| anyhow!("no such project"))?;
536 if project.host_connection()? != connection {
537 return Err(anyhow!("can't update a project hosted by someone else"))?;
538 }
539
540 if let Some(content) = &update.content {
541 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
542 project_id: ActiveValue::Set(project_id),
543 worktree_id: ActiveValue::Set(update.worktree_id as i64),
544 path: ActiveValue::Set(update.path.clone()),
545 content: ActiveValue::Set(content.clone()),
546 })
547 .on_conflict(
548 OnConflict::columns([
549 worktree_settings_file::Column::ProjectId,
550 worktree_settings_file::Column::WorktreeId,
551 worktree_settings_file::Column::Path,
552 ])
553 .update_column(worktree_settings_file::Column::Content)
554 .to_owned(),
555 )
556 .exec(&*tx)
557 .await?;
558 } else {
559 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
560 project_id: ActiveValue::Set(project_id),
561 worktree_id: ActiveValue::Set(update.worktree_id as i64),
562 path: ActiveValue::Set(update.path.clone()),
563 ..Default::default()
564 })
565 .exec(&*tx)
566 .await?;
567 }
568
569 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
570 Ok(connection_ids)
571 })
572 .await
573 }
574
575 /// Adds the given connection to the specified hosted project
576 pub async fn join_hosted_project(
577 &self,
578 id: ProjectId,
579 user_id: UserId,
580 connection: ConnectionId,
581 ) -> Result<(Project, ReplicaId)> {
582 self.transaction(|tx| async move {
583 let (project, hosted_project) = project::Entity::find_by_id(id)
584 .find_also_related(hosted_project::Entity)
585 .one(&*tx)
586 .await?
587 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
588
589 let Some(hosted_project) = hosted_project else {
590 return Err(anyhow!("project is not hosted"))?;
591 };
592
593 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
594 .one(&*tx)
595 .await?
596 .ok_or_else(|| anyhow!("no such channel"))?;
597
598 let role = self
599 .check_user_is_channel_participant(&channel, user_id, &tx)
600 .await?;
601
602 self.join_project_internal(project, user_id, connection, role, &tx)
603 .await
604 })
605 .await
606 }
607
608 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
609 self.transaction(|tx| async move {
610 Ok(project::Entity::find_by_id(id)
611 .one(&*tx)
612 .await?
613 .ok_or_else(|| anyhow!("no such project"))?)
614 })
615 .await
616 }
617
618 pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
619 self.transaction(|tx| async move {
620 Ok(project::Entity::find()
621 .filter(project::Column::DevServerProjectId.eq(id))
622 .one(&*tx)
623 .await?
624 .ok_or_else(|| anyhow!("no such project"))?)
625 })
626 .await
627 }
628
629 /// Adds the given connection to the specified project
630 /// in the current room.
631 pub async fn join_project(
632 &self,
633 project_id: ProjectId,
634 connection: ConnectionId,
635 user_id: UserId,
636 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
637 self.project_transaction(project_id, |tx| async move {
638 let (project, role) = self
639 .access_project(
640 project_id,
641 connection,
642 PrincipalId::UserId(user_id),
643 Capability::ReadOnly,
644 &tx,
645 )
646 .await?;
647 self.join_project_internal(project, user_id, connection, role, &tx)
648 .await
649 })
650 .await
651 }
652
653 async fn join_project_internal(
654 &self,
655 project: project::Model,
656 user_id: UserId,
657 connection: ConnectionId,
658 role: ChannelRole,
659 tx: &DatabaseTransaction,
660 ) -> Result<(Project, ReplicaId)> {
661 let mut collaborators = project
662 .find_related(project_collaborator::Entity)
663 .all(tx)
664 .await?;
665 let replica_ids = collaborators
666 .iter()
667 .map(|c| c.replica_id)
668 .collect::<HashSet<_>>();
669 let mut replica_id = ReplicaId(1);
670 while replica_ids.contains(&replica_id) {
671 replica_id.0 += 1;
672 }
673 let new_collaborator = project_collaborator::ActiveModel {
674 project_id: ActiveValue::set(project.id),
675 connection_id: ActiveValue::set(connection.id as i32),
676 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
677 user_id: ActiveValue::set(user_id),
678 replica_id: ActiveValue::set(replica_id),
679 is_host: ActiveValue::set(false),
680 ..Default::default()
681 }
682 .insert(tx)
683 .await?;
684 collaborators.push(new_collaborator);
685
686 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
687 let mut worktrees = db_worktrees
688 .into_iter()
689 .map(|db_worktree| {
690 (
691 db_worktree.id as u64,
692 Worktree {
693 id: db_worktree.id as u64,
694 abs_path: db_worktree.abs_path,
695 root_name: db_worktree.root_name,
696 visible: db_worktree.visible,
697 entries: Default::default(),
698 repository_entries: Default::default(),
699 diagnostic_summaries: Default::default(),
700 settings_files: Default::default(),
701 scan_id: db_worktree.scan_id as u64,
702 completed_scan_id: db_worktree.completed_scan_id as u64,
703 },
704 )
705 })
706 .collect::<BTreeMap<_, _>>();
707
708 // Populate worktree entries.
709 {
710 let mut db_entries = worktree_entry::Entity::find()
711 .filter(
712 Condition::all()
713 .add(worktree_entry::Column::ProjectId.eq(project.id))
714 .add(worktree_entry::Column::IsDeleted.eq(false)),
715 )
716 .stream(tx)
717 .await?;
718 while let Some(db_entry) = db_entries.next().await {
719 let db_entry = db_entry?;
720 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
721 worktree.entries.push(proto::Entry {
722 id: db_entry.id as u64,
723 is_dir: db_entry.is_dir,
724 path: db_entry.path,
725 inode: db_entry.inode as u64,
726 mtime: Some(proto::Timestamp {
727 seconds: db_entry.mtime_seconds as u64,
728 nanos: db_entry.mtime_nanos as u32,
729 }),
730 is_symlink: db_entry.is_symlink,
731 is_ignored: db_entry.is_ignored,
732 is_external: db_entry.is_external,
733 git_status: db_entry.git_status.map(|status| status as i32),
734 // This is only used in the summarization backlog, so if it's None,
735 // that just means we won't be able to detect when to resummarize
736 // based on total number of backlogged bytes - instead, we'd go
737 // on number of files only. That shouldn't be a huge deal in practice.
738 size: None,
739 is_fifo: db_entry.is_fifo,
740 });
741 }
742 }
743 }
744
745 // Populate repository entries.
746 {
747 let mut db_repository_entries = worktree_repository::Entity::find()
748 .filter(
749 Condition::all()
750 .add(worktree_repository::Column::ProjectId.eq(project.id))
751 .add(worktree_repository::Column::IsDeleted.eq(false)),
752 )
753 .stream(tx)
754 .await?;
755 while let Some(db_repository_entry) = db_repository_entries.next().await {
756 let db_repository_entry = db_repository_entry?;
757 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
758 {
759 worktree.repository_entries.insert(
760 db_repository_entry.work_directory_id as u64,
761 proto::RepositoryEntry {
762 work_directory_id: db_repository_entry.work_directory_id as u64,
763 branch: db_repository_entry.branch,
764 },
765 );
766 }
767 }
768 }
769
770 // Populate worktree diagnostic summaries.
771 {
772 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
773 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
774 .stream(tx)
775 .await?;
776 while let Some(db_summary) = db_summaries.next().await {
777 let db_summary = db_summary?;
778 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
779 worktree
780 .diagnostic_summaries
781 .push(proto::DiagnosticSummary {
782 path: db_summary.path,
783 language_server_id: db_summary.language_server_id as u64,
784 error_count: db_summary.error_count as u32,
785 warning_count: db_summary.warning_count as u32,
786 });
787 }
788 }
789 }
790
791 // Populate worktree settings files
792 {
793 let mut db_settings_files = worktree_settings_file::Entity::find()
794 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
795 .stream(tx)
796 .await?;
797 while let Some(db_settings_file) = db_settings_files.next().await {
798 let db_settings_file = db_settings_file?;
799 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
800 worktree.settings_files.push(WorktreeSettingsFile {
801 path: db_settings_file.path,
802 content: db_settings_file.content,
803 });
804 }
805 }
806 }
807
808 // Populate language servers.
809 let language_servers = project
810 .find_related(language_server::Entity)
811 .all(tx)
812 .await?;
813
814 let project = Project {
815 id: project.id,
816 role,
817 collaborators: collaborators
818 .into_iter()
819 .map(|collaborator| ProjectCollaborator {
820 connection_id: collaborator.connection(),
821 user_id: collaborator.user_id,
822 replica_id: collaborator.replica_id,
823 is_host: collaborator.is_host,
824 })
825 .collect(),
826 worktrees,
827 language_servers: language_servers
828 .into_iter()
829 .map(|language_server| proto::LanguageServer {
830 id: language_server.id as u64,
831 name: language_server.name,
832 })
833 .collect(),
834 dev_server_project_id: project.dev_server_project_id,
835 };
836 Ok((project, replica_id as ReplicaId))
837 }
838
839 pub async fn leave_hosted_project(
840 &self,
841 project_id: ProjectId,
842 connection: ConnectionId,
843 ) -> Result<LeftProject> {
844 self.transaction(|tx| async move {
845 let result = project_collaborator::Entity::delete_many()
846 .filter(
847 Condition::all()
848 .add(project_collaborator::Column::ProjectId.eq(project_id))
849 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
850 .add(
851 project_collaborator::Column::ConnectionServerId
852 .eq(connection.owner_id as i32),
853 ),
854 )
855 .exec(&*tx)
856 .await?;
857 if result.rows_affected == 0 {
858 return Err(anyhow!("not in the project"))?;
859 }
860
861 let project = project::Entity::find_by_id(project_id)
862 .one(&*tx)
863 .await?
864 .ok_or_else(|| anyhow!("no such project"))?;
865 let collaborators = project
866 .find_related(project_collaborator::Entity)
867 .all(&*tx)
868 .await?;
869 let connection_ids = collaborators
870 .into_iter()
871 .map(|collaborator| collaborator.connection())
872 .collect();
873 Ok(LeftProject {
874 id: project.id,
875 connection_ids,
876 should_unshare: false,
877 })
878 })
879 .await
880 }
881
882 /// Removes the given connection from the specified project.
883 pub async fn leave_project(
884 &self,
885 project_id: ProjectId,
886 connection: ConnectionId,
887 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
888 self.project_transaction(project_id, |tx| async move {
889 let result = project_collaborator::Entity::delete_many()
890 .filter(
891 Condition::all()
892 .add(project_collaborator::Column::ProjectId.eq(project_id))
893 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
894 .add(
895 project_collaborator::Column::ConnectionServerId
896 .eq(connection.owner_id as i32),
897 ),
898 )
899 .exec(&*tx)
900 .await?;
901 if result.rows_affected == 0 {
902 Err(anyhow!("not a collaborator on this project"))?;
903 }
904
905 let project = project::Entity::find_by_id(project_id)
906 .one(&*tx)
907 .await?
908 .ok_or_else(|| anyhow!("no such project"))?;
909 let collaborators = project
910 .find_related(project_collaborator::Entity)
911 .all(&*tx)
912 .await?;
913 let connection_ids: Vec<ConnectionId> = collaborators
914 .into_iter()
915 .map(|collaborator| collaborator.connection())
916 .collect();
917
918 follower::Entity::delete_many()
919 .filter(
920 Condition::any()
921 .add(
922 Condition::all()
923 .add(follower::Column::ProjectId.eq(Some(project_id)))
924 .add(
925 follower::Column::LeaderConnectionServerId
926 .eq(connection.owner_id),
927 )
928 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
929 )
930 .add(
931 Condition::all()
932 .add(follower::Column::ProjectId.eq(Some(project_id)))
933 .add(
934 follower::Column::FollowerConnectionServerId
935 .eq(connection.owner_id),
936 )
937 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
938 ),
939 )
940 .exec(&*tx)
941 .await?;
942
943 let room = if let Some(room_id) = project.room_id {
944 Some(self.get_room(room_id, &tx).await?)
945 } else {
946 None
947 };
948
949 let left_project = LeftProject {
950 id: project_id,
951 should_unshare: connection == project.host_connection()?,
952 connection_ids,
953 };
954 Ok((room, left_project))
955 })
956 .await
957 }
958
959 pub async fn check_user_is_project_host(
960 &self,
961 project_id: ProjectId,
962 connection_id: ConnectionId,
963 ) -> Result<()> {
964 self.project_transaction(project_id, |tx| async move {
965 project::Entity::find()
966 .filter(
967 Condition::all()
968 .add(project::Column::Id.eq(project_id))
969 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
970 .add(
971 project::Column::HostConnectionServerId
972 .eq(Some(connection_id.owner_id as i32)),
973 ),
974 )
975 .one(&*tx)
976 .await?
977 .ok_or_else(|| anyhow!("failed to read project host"))?;
978
979 Ok(())
980 })
981 .await
982 .map(|guard| guard.into_inner())
983 }
984
985 /// Returns the current project if the given user is authorized to access it with the specified capability.
986 pub async fn access_project(
987 &self,
988 project_id: ProjectId,
989 connection_id: ConnectionId,
990 principal_id: PrincipalId,
991 capability: Capability,
992 tx: &DatabaseTransaction,
993 ) -> Result<(project::Model, ChannelRole)> {
994 let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
995 .find_also_related(dev_server_project::Entity)
996 .one(tx)
997 .await?
998 .ok_or_else(|| anyhow!("no such project"))?;
999
1000 let user_id = match principal_id {
1001 PrincipalId::DevServerId(_) => {
1002 if project
1003 .host_connection()
1004 .is_ok_and(|connection| connection == connection_id)
1005 {
1006 return Ok((project, ChannelRole::Admin));
1007 }
1008 return Err(anyhow!("not the project host"))?;
1009 }
1010 PrincipalId::UserId(user_id) => user_id,
1011 };
1012
1013 let role_from_room = if let Some(room_id) = project.room_id {
1014 room_participant::Entity::find()
1015 .filter(room_participant::Column::RoomId.eq(room_id))
1016 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1017 .one(tx)
1018 .await?
1019 .and_then(|participant| participant.role)
1020 } else {
1021 None
1022 };
1023 let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1024 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1025 .one(tx)
1026 .await?
1027 .ok_or_else(|| anyhow!("no such channel"))?;
1028 if user_id == dev_server.user_id {
1029 // If the user left the room "uncleanly" they may rejoin the
1030 // remote project before leave_room runs. IN that case kick
1031 // the project out of the room pre-emptively.
1032 if role_from_room.is_none() {
1033 project = project::Entity::update(project::ActiveModel {
1034 room_id: ActiveValue::Set(None),
1035 ..project.into_active_model()
1036 })
1037 .exec(tx)
1038 .await?;
1039 }
1040 Some(ChannelRole::Admin)
1041 } else {
1042 None
1043 }
1044 } else {
1045 None
1046 };
1047
1048 let role = role_from_dev_server
1049 .or(role_from_room)
1050 .unwrap_or(ChannelRole::Banned);
1051
1052 match capability {
1053 Capability::ReadWrite => {
1054 if !role.can_edit_projects() {
1055 return Err(anyhow!("not authorized to edit projects"))?;
1056 }
1057 }
1058 Capability::ReadOnly => {
1059 if !role.can_read_projects() {
1060 return Err(anyhow!("not authorized to read projects"))?;
1061 }
1062 }
1063 }
1064
1065 Ok((project, role))
1066 }
1067
1068 /// Returns the host connection for a read-only request to join a shared project.
1069 pub async fn host_for_read_only_project_request(
1070 &self,
1071 project_id: ProjectId,
1072 connection_id: ConnectionId,
1073 user_id: UserId,
1074 ) -> Result<ConnectionId> {
1075 self.project_transaction(project_id, |tx| async move {
1076 let (project, _) = self
1077 .access_project(
1078 project_id,
1079 connection_id,
1080 PrincipalId::UserId(user_id),
1081 Capability::ReadOnly,
1082 &tx,
1083 )
1084 .await?;
1085 project.host_connection()
1086 })
1087 .await
1088 .map(|guard| guard.into_inner())
1089 }
1090
1091 /// Returns the host connection for a request to join a shared project.
1092 pub async fn host_for_mutating_project_request(
1093 &self,
1094 project_id: ProjectId,
1095 connection_id: ConnectionId,
1096 user_id: UserId,
1097 ) -> Result<ConnectionId> {
1098 self.project_transaction(project_id, |tx| async move {
1099 let (project, _) = self
1100 .access_project(
1101 project_id,
1102 connection_id,
1103 PrincipalId::UserId(user_id),
1104 Capability::ReadWrite,
1105 &tx,
1106 )
1107 .await?;
1108 project.host_connection()
1109 })
1110 .await
1111 .map(|guard| guard.into_inner())
1112 }
1113
1114 /// Returns the host connection for a request to join a shared project.
1115 pub async fn host_for_owner_project_request(
1116 &self,
1117 project_id: ProjectId,
1118 _connection_id: ConnectionId,
1119 user_id: UserId,
1120 ) -> Result<ConnectionId> {
1121 self.project_transaction(project_id, |tx| async move {
1122 let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1123 .find_also_related(dev_server_project::Entity)
1124 .one(&*tx)
1125 .await?
1126 .ok_or_else(|| anyhow!("no such project"))?;
1127
1128 let Some(dev_server_project) = dev_server_project else {
1129 return Err(anyhow!("not a dev server project"))?;
1130 };
1131 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1132 .one(&*tx)
1133 .await?
1134 .ok_or_else(|| anyhow!("no such dev server"))?;
1135 if dev_server.user_id != user_id {
1136 return Err(anyhow!("not your project"))?;
1137 }
1138 project.host_connection()
1139 })
1140 .await
1141 .map(|guard| guard.into_inner())
1142 }
1143
1144 pub async fn connections_for_buffer_update(
1145 &self,
1146 project_id: ProjectId,
1147 principal_id: PrincipalId,
1148 connection_id: ConnectionId,
1149 capability: Capability,
1150 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1151 self.project_transaction(project_id, |tx| async move {
1152 // Authorize
1153 let (project, _) = self
1154 .access_project(project_id, connection_id, principal_id, capability, &tx)
1155 .await?;
1156
1157 let host_connection_id = project.host_connection()?;
1158
1159 let collaborators = project_collaborator::Entity::find()
1160 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1161 .all(&*tx)
1162 .await?;
1163
1164 let guest_connection_ids = collaborators
1165 .into_iter()
1166 .filter_map(|collaborator| {
1167 if collaborator.is_host {
1168 None
1169 } else {
1170 Some(collaborator.connection())
1171 }
1172 })
1173 .collect();
1174
1175 Ok((host_connection_id, guest_connection_ids))
1176 })
1177 .await
1178 }
1179
1180 /// Returns the connection IDs in the given project.
1181 ///
1182 /// The provided `connection_id` must also be a collaborator in the project,
1183 /// otherwise an error will be returned.
1184 pub async fn project_connection_ids(
1185 &self,
1186 project_id: ProjectId,
1187 connection_id: ConnectionId,
1188 exclude_dev_server: bool,
1189 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1190 self.project_transaction(project_id, |tx| async move {
1191 let project = project::Entity::find_by_id(project_id)
1192 .one(&*tx)
1193 .await?
1194 .ok_or_else(|| anyhow!("no such project"))?;
1195
1196 let mut collaborators = project_collaborator::Entity::find()
1197 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1198 .stream(&*tx)
1199 .await?;
1200
1201 let mut connection_ids = HashSet::default();
1202 if let Some(host_connection) = project.host_connection().log_err() {
1203 if !exclude_dev_server {
1204 connection_ids.insert(host_connection);
1205 }
1206 }
1207
1208 while let Some(collaborator) = collaborators.next().await {
1209 let collaborator = collaborator?;
1210 connection_ids.insert(collaborator.connection());
1211 }
1212
1213 if connection_ids.contains(&connection_id)
1214 || Some(connection_id) == project.host_connection().ok()
1215 {
1216 Ok(connection_ids)
1217 } else {
1218 Err(anyhow!(
1219 "can only send project updates to a project you're in"
1220 ))?
1221 }
1222 })
1223 .await
1224 }
1225
1226 async fn project_guest_connection_ids(
1227 &self,
1228 project_id: ProjectId,
1229 tx: &DatabaseTransaction,
1230 ) -> Result<Vec<ConnectionId>> {
1231 let mut collaborators = project_collaborator::Entity::find()
1232 .filter(
1233 project_collaborator::Column::ProjectId
1234 .eq(project_id)
1235 .and(project_collaborator::Column::IsHost.eq(false)),
1236 )
1237 .stream(tx)
1238 .await?;
1239
1240 let mut guest_connection_ids = Vec::new();
1241 while let Some(collaborator) = collaborators.next().await {
1242 let collaborator = collaborator?;
1243 guest_connection_ids.push(collaborator.connection());
1244 }
1245 Ok(guest_connection_ids)
1246 }
1247
1248 /// Returns the [`RoomId`] for the given project.
1249 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1250 self.transaction(|tx| async move {
1251 Ok(project::Entity::find_by_id(project_id)
1252 .one(&*tx)
1253 .await?
1254 .and_then(|project| project.room_id))
1255 })
1256 .await
1257 }
1258
1259 pub async fn check_room_participants(
1260 &self,
1261 room_id: RoomId,
1262 leader_id: ConnectionId,
1263 follower_id: ConnectionId,
1264 ) -> Result<()> {
1265 self.transaction(|tx| async move {
1266 use room_participant::Column;
1267
1268 let count = room_participant::Entity::find()
1269 .filter(
1270 Condition::all().add(Column::RoomId.eq(room_id)).add(
1271 Condition::any()
1272 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1273 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1274 ))
1275 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1276 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1277 )),
1278 ),
1279 )
1280 .count(&*tx)
1281 .await?;
1282
1283 if count < 2 {
1284 Err(anyhow!("not room participants"))?;
1285 }
1286
1287 Ok(())
1288 })
1289 .await
1290 }
1291
1292 /// Adds the given follower connection as a follower of the given leader connection.
1293 pub async fn follow(
1294 &self,
1295 room_id: RoomId,
1296 project_id: ProjectId,
1297 leader_connection: ConnectionId,
1298 follower_connection: ConnectionId,
1299 ) -> Result<TransactionGuard<proto::Room>> {
1300 self.room_transaction(room_id, |tx| async move {
1301 follower::ActiveModel {
1302 room_id: ActiveValue::set(room_id),
1303 project_id: ActiveValue::set(project_id),
1304 leader_connection_server_id: ActiveValue::set(ServerId(
1305 leader_connection.owner_id as i32,
1306 )),
1307 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1308 follower_connection_server_id: ActiveValue::set(ServerId(
1309 follower_connection.owner_id as i32,
1310 )),
1311 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1312 ..Default::default()
1313 }
1314 .insert(&*tx)
1315 .await?;
1316
1317 let room = self.get_room(room_id, &tx).await?;
1318 Ok(room)
1319 })
1320 .await
1321 }
1322
1323 /// Removes the given follower connection as a follower of the given leader connection.
1324 pub async fn unfollow(
1325 &self,
1326 room_id: RoomId,
1327 project_id: ProjectId,
1328 leader_connection: ConnectionId,
1329 follower_connection: ConnectionId,
1330 ) -> Result<TransactionGuard<proto::Room>> {
1331 self.room_transaction(room_id, |tx| async move {
1332 follower::Entity::delete_many()
1333 .filter(
1334 Condition::all()
1335 .add(follower::Column::RoomId.eq(room_id))
1336 .add(follower::Column::ProjectId.eq(project_id))
1337 .add(
1338 follower::Column::LeaderConnectionServerId
1339 .eq(leader_connection.owner_id),
1340 )
1341 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1342 .add(
1343 follower::Column::FollowerConnectionServerId
1344 .eq(follower_connection.owner_id),
1345 )
1346 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1347 )
1348 .exec(&*tx)
1349 .await?;
1350
1351 let room = self.get_room(room_id, &tx).await?;
1352 Ok(room)
1353 })
1354 .await
1355 }
1356}