1use anyhow::Context as _;
2use util::ResultExt;
3
4use super::*;
5
6impl Database {
7 /// Returns the count of all projects, excluding ones marked as admin.
8 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
9 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
10 enum QueryAs {
11 Count,
12 }
13
14 self.transaction(|tx| async move {
15 Ok(project::Entity::find()
16 .select_only()
17 .column_as(project::Column::Id.count(), QueryAs::Count)
18 .inner_join(user::Entity)
19 .filter(user::Column::Admin.eq(false))
20 .into_values::<_, QueryAs>()
21 .one(&*tx)
22 .await?
23 .unwrap_or(0i64) as usize)
24 })
25 .await
26 }
27
28 /// Shares a project with the given room.
29 pub async fn share_project(
30 &self,
31 room_id: RoomId,
32 connection: ConnectionId,
33 worktrees: &[proto::WorktreeMetadata],
34 is_ssh_project: bool,
35 dev_server_project_id: Option<DevServerProjectId>,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 if let Some(dev_server_project_id) = dev_server_project_id {
65 let project = project::Entity::find()
66 .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
67 .one(&*tx)
68 .await?
69 .ok_or_else(|| anyhow!("no remote project"))?;
70
71 let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
72 .find_also_related(dev_server::Entity)
73 .one(&*tx)
74 .await?
75 .ok_or_else(|| anyhow!("no dev_server_project"))?;
76
77 if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
78 return Err(anyhow!("not your dev server"))?;
79 }
80
81 if project.room_id.is_some() {
82 return Err(anyhow!("project already shared"))?;
83 };
84
85 let project = project::Entity::update(project::ActiveModel {
86 room_id: ActiveValue::Set(Some(room_id)),
87 ..project.into_active_model()
88 })
89 .exec(&*tx)
90 .await?;
91
92 let room = self.get_room(room_id, &tx).await?;
93 return Ok((project.id, room));
94 }
95
96 let project = project::ActiveModel {
97 room_id: ActiveValue::set(Some(participant.room_id)),
98 host_user_id: ActiveValue::set(Some(participant.user_id)),
99 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
100 host_connection_server_id: ActiveValue::set(Some(ServerId(
101 connection.owner_id as i32,
102 ))),
103 id: ActiveValue::NotSet,
104 hosted_project_id: ActiveValue::Set(None),
105 dev_server_project_id: ActiveValue::Set(None),
106 }
107 .insert(&*tx)
108 .await?;
109
110 if !worktrees.is_empty() {
111 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
112 worktree::ActiveModel {
113 id: ActiveValue::set(worktree.id as i64),
114 project_id: ActiveValue::set(project.id),
115 abs_path: ActiveValue::set(worktree.abs_path.clone()),
116 root_name: ActiveValue::set(worktree.root_name.clone()),
117 visible: ActiveValue::set(worktree.visible),
118 scan_id: ActiveValue::set(0),
119 completed_scan_id: ActiveValue::set(0),
120 }
121 }))
122 .exec(&*tx)
123 .await?;
124 }
125
126 let replica_id = if is_ssh_project { 1 } else { 0 };
127
128 project_collaborator::ActiveModel {
129 project_id: ActiveValue::set(project.id),
130 connection_id: ActiveValue::set(connection.id as i32),
131 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
132 user_id: ActiveValue::set(participant.user_id),
133 replica_id: ActiveValue::set(ReplicaId(replica_id)),
134 is_host: ActiveValue::set(true),
135 ..Default::default()
136 }
137 .insert(&*tx)
138 .await?;
139
140 let room = self.get_room(room_id, &tx).await?;
141 Ok((project.id, room))
142 })
143 .await
144 }
145
146 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
147 self.weak_transaction(|tx| async move {
148 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
149 Ok(())
150 })
151 .await
152 }
153
154 /// Unshares the given project.
155 pub async fn unshare_project(
156 &self,
157 project_id: ProjectId,
158 connection: ConnectionId,
159 user_id: Option<UserId>,
160 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
161 self.project_transaction(project_id, |tx| async move {
162 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
163 let project = project::Entity::find_by_id(project_id)
164 .one(&*tx)
165 .await?
166 .ok_or_else(|| anyhow!("project not found"))?;
167 let room = if let Some(room_id) = project.room_id {
168 Some(self.get_room(room_id, &tx).await?)
169 } else {
170 None
171 };
172 if project.host_connection()? == connection {
173 return Ok((true, room, guest_connection_ids));
174 }
175 if let Some(dev_server_project_id) = project.dev_server_project_id {
176 if let Some(user_id) = user_id {
177 if user_id
178 != self
179 .owner_for_dev_server_project(dev_server_project_id, &tx)
180 .await?
181 {
182 Err(anyhow!("cannot unshare a project hosted by another user"))?
183 }
184 project::Entity::update(project::ActiveModel {
185 room_id: ActiveValue::Set(None),
186 ..project.into_active_model()
187 })
188 .exec(&*tx)
189 .await?;
190 return Ok((false, room, guest_connection_ids));
191 }
192 }
193
194 Err(anyhow!("cannot unshare a project hosted by another user"))?
195 })
196 .await
197 }
198
199 /// Updates the worktrees associated with the given project.
200 pub async fn update_project(
201 &self,
202 project_id: ProjectId,
203 connection: ConnectionId,
204 worktrees: &[proto::WorktreeMetadata],
205 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
206 self.project_transaction(project_id, |tx| async move {
207 let project = project::Entity::find_by_id(project_id)
208 .filter(
209 Condition::all()
210 .add(project::Column::HostConnectionId.eq(connection.id as i32))
211 .add(
212 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
213 ),
214 )
215 .one(&*tx)
216 .await?
217 .ok_or_else(|| anyhow!("no such project"))?;
218
219 self.update_project_worktrees(project.id, worktrees, &tx)
220 .await?;
221
222 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
223
224 let room = if let Some(room_id) = project.room_id {
225 Some(self.get_room(room_id, &tx).await?)
226 } else {
227 None
228 };
229
230 Ok((room, guest_connection_ids))
231 })
232 .await
233 }
234
235 pub(in crate::db) async fn update_project_worktrees(
236 &self,
237 project_id: ProjectId,
238 worktrees: &[proto::WorktreeMetadata],
239 tx: &DatabaseTransaction,
240 ) -> Result<()> {
241 if !worktrees.is_empty() {
242 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
243 id: ActiveValue::set(worktree.id as i64),
244 project_id: ActiveValue::set(project_id),
245 abs_path: ActiveValue::set(worktree.abs_path.clone()),
246 root_name: ActiveValue::set(worktree.root_name.clone()),
247 visible: ActiveValue::set(worktree.visible),
248 scan_id: ActiveValue::set(0),
249 completed_scan_id: ActiveValue::set(0),
250 }))
251 .on_conflict(
252 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
253 .update_column(worktree::Column::RootName)
254 .to_owned(),
255 )
256 .exec(tx)
257 .await?;
258 }
259
260 worktree::Entity::delete_many()
261 .filter(worktree::Column::ProjectId.eq(project_id).and(
262 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
263 ))
264 .exec(tx)
265 .await?;
266
267 Ok(())
268 }
269
270 pub async fn update_worktree(
271 &self,
272 update: &proto::UpdateWorktree,
273 connection: ConnectionId,
274 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
275 let project_id = ProjectId::from_proto(update.project_id);
276 let worktree_id = update.worktree_id as i64;
277 self.project_transaction(project_id, |tx| async move {
278 // Ensure the update comes from the host.
279 let _project = project::Entity::find_by_id(project_id)
280 .filter(
281 Condition::all()
282 .add(project::Column::HostConnectionId.eq(connection.id as i32))
283 .add(
284 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
285 ),
286 )
287 .one(&*tx)
288 .await?
289 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
290
291 // Update metadata.
292 worktree::Entity::update(worktree::ActiveModel {
293 id: ActiveValue::set(worktree_id),
294 project_id: ActiveValue::set(project_id),
295 root_name: ActiveValue::set(update.root_name.clone()),
296 scan_id: ActiveValue::set(update.scan_id as i64),
297 completed_scan_id: if update.is_last_update {
298 ActiveValue::set(update.scan_id as i64)
299 } else {
300 ActiveValue::default()
301 },
302 abs_path: ActiveValue::set(update.abs_path.clone()),
303 ..Default::default()
304 })
305 .exec(&*tx)
306 .await?;
307
308 if !update.updated_entries.is_empty() {
309 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
310 let mtime = entry.mtime.clone().unwrap_or_default();
311 worktree_entry::ActiveModel {
312 project_id: ActiveValue::set(project_id),
313 worktree_id: ActiveValue::set(worktree_id),
314 id: ActiveValue::set(entry.id as i64),
315 is_dir: ActiveValue::set(entry.is_dir),
316 path: ActiveValue::set(entry.path.clone()),
317 inode: ActiveValue::set(entry.inode as i64),
318 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
319 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
320 is_symlink: ActiveValue::set(entry.is_symlink),
321 is_ignored: ActiveValue::set(entry.is_ignored),
322 is_external: ActiveValue::set(entry.is_external),
323 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
324 is_deleted: ActiveValue::set(false),
325 scan_id: ActiveValue::set(update.scan_id as i64),
326 is_fifo: ActiveValue::set(entry.is_fifo),
327 }
328 }))
329 .on_conflict(
330 OnConflict::columns([
331 worktree_entry::Column::ProjectId,
332 worktree_entry::Column::WorktreeId,
333 worktree_entry::Column::Id,
334 ])
335 .update_columns([
336 worktree_entry::Column::IsDir,
337 worktree_entry::Column::Path,
338 worktree_entry::Column::Inode,
339 worktree_entry::Column::MtimeSeconds,
340 worktree_entry::Column::MtimeNanos,
341 worktree_entry::Column::IsSymlink,
342 worktree_entry::Column::IsIgnored,
343 worktree_entry::Column::GitStatus,
344 worktree_entry::Column::ScanId,
345 ])
346 .to_owned(),
347 )
348 .exec(&*tx)
349 .await?;
350 }
351
352 if !update.removed_entries.is_empty() {
353 worktree_entry::Entity::update_many()
354 .filter(
355 worktree_entry::Column::ProjectId
356 .eq(project_id)
357 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
358 .and(
359 worktree_entry::Column::Id
360 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
361 ),
362 )
363 .set(worktree_entry::ActiveModel {
364 is_deleted: ActiveValue::Set(true),
365 scan_id: ActiveValue::Set(update.scan_id as i64),
366 ..Default::default()
367 })
368 .exec(&*tx)
369 .await?;
370 }
371
372 if !update.updated_repositories.is_empty() {
373 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
374 |repository| worktree_repository::ActiveModel {
375 project_id: ActiveValue::set(project_id),
376 worktree_id: ActiveValue::set(worktree_id),
377 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
378 scan_id: ActiveValue::set(update.scan_id as i64),
379 branch: ActiveValue::set(repository.branch.clone()),
380 is_deleted: ActiveValue::set(false),
381 },
382 ))
383 .on_conflict(
384 OnConflict::columns([
385 worktree_repository::Column::ProjectId,
386 worktree_repository::Column::WorktreeId,
387 worktree_repository::Column::WorkDirectoryId,
388 ])
389 .update_columns([
390 worktree_repository::Column::ScanId,
391 worktree_repository::Column::Branch,
392 ])
393 .to_owned(),
394 )
395 .exec(&*tx)
396 .await?;
397 }
398
399 if !update.removed_repositories.is_empty() {
400 worktree_repository::Entity::update_many()
401 .filter(
402 worktree_repository::Column::ProjectId
403 .eq(project_id)
404 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
405 .and(
406 worktree_repository::Column::WorkDirectoryId
407 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
408 ),
409 )
410 .set(worktree_repository::ActiveModel {
411 is_deleted: ActiveValue::Set(true),
412 scan_id: ActiveValue::Set(update.scan_id as i64),
413 ..Default::default()
414 })
415 .exec(&*tx)
416 .await?;
417 }
418
419 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
420 Ok(connection_ids)
421 })
422 .await
423 }
424
425 /// Updates the diagnostic summary for the given connection.
426 pub async fn update_diagnostic_summary(
427 &self,
428 update: &proto::UpdateDiagnosticSummary,
429 connection: ConnectionId,
430 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
431 let project_id = ProjectId::from_proto(update.project_id);
432 let worktree_id = update.worktree_id as i64;
433 self.project_transaction(project_id, |tx| async move {
434 let summary = update
435 .summary
436 .as_ref()
437 .ok_or_else(|| anyhow!("invalid summary"))?;
438
439 // Ensure the update comes from the host.
440 let project = project::Entity::find_by_id(project_id)
441 .one(&*tx)
442 .await?
443 .ok_or_else(|| anyhow!("no such project"))?;
444 if project.host_connection()? != connection {
445 return Err(anyhow!("can't update a project hosted by someone else"))?;
446 }
447
448 // Update summary.
449 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
450 project_id: ActiveValue::set(project_id),
451 worktree_id: ActiveValue::set(worktree_id),
452 path: ActiveValue::set(summary.path.clone()),
453 language_server_id: ActiveValue::set(summary.language_server_id as i64),
454 error_count: ActiveValue::set(summary.error_count as i32),
455 warning_count: ActiveValue::set(summary.warning_count as i32),
456 })
457 .on_conflict(
458 OnConflict::columns([
459 worktree_diagnostic_summary::Column::ProjectId,
460 worktree_diagnostic_summary::Column::WorktreeId,
461 worktree_diagnostic_summary::Column::Path,
462 ])
463 .update_columns([
464 worktree_diagnostic_summary::Column::LanguageServerId,
465 worktree_diagnostic_summary::Column::ErrorCount,
466 worktree_diagnostic_summary::Column::WarningCount,
467 ])
468 .to_owned(),
469 )
470 .exec(&*tx)
471 .await?;
472
473 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
474 Ok(connection_ids)
475 })
476 .await
477 }
478
479 /// Starts the language server for the given connection.
480 pub async fn start_language_server(
481 &self,
482 update: &proto::StartLanguageServer,
483 connection: ConnectionId,
484 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
485 let project_id = ProjectId::from_proto(update.project_id);
486 self.project_transaction(project_id, |tx| async move {
487 let server = update
488 .server
489 .as_ref()
490 .ok_or_else(|| anyhow!("invalid language server"))?;
491
492 // Ensure the update comes from the host.
493 let project = project::Entity::find_by_id(project_id)
494 .one(&*tx)
495 .await?
496 .ok_or_else(|| anyhow!("no such project"))?;
497 if project.host_connection()? != connection {
498 return Err(anyhow!("can't update a project hosted by someone else"))?;
499 }
500
501 // Add the newly-started language server.
502 language_server::Entity::insert(language_server::ActiveModel {
503 project_id: ActiveValue::set(project_id),
504 id: ActiveValue::set(server.id as i64),
505 name: ActiveValue::set(server.name.clone()),
506 })
507 .on_conflict(
508 OnConflict::columns([
509 language_server::Column::ProjectId,
510 language_server::Column::Id,
511 ])
512 .update_column(language_server::Column::Name)
513 .to_owned(),
514 )
515 .exec(&*tx)
516 .await?;
517
518 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
519 Ok(connection_ids)
520 })
521 .await
522 }
523
524 /// Updates the worktree settings for the given connection.
525 pub async fn update_worktree_settings(
526 &self,
527 update: &proto::UpdateWorktreeSettings,
528 connection: ConnectionId,
529 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
530 let project_id = ProjectId::from_proto(update.project_id);
531 let kind = match update.kind {
532 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
533 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
534 None => proto::LocalSettingsKind::Settings,
535 };
536 let kind = LocalSettingsKind::from_proto(kind);
537 self.project_transaction(project_id, |tx| async move {
538 // Ensure the update comes from the host.
539 let project = project::Entity::find_by_id(project_id)
540 .one(&*tx)
541 .await?
542 .ok_or_else(|| anyhow!("no such project"))?;
543 if project.host_connection()? != connection {
544 return Err(anyhow!("can't update a project hosted by someone else"))?;
545 }
546
547 if let Some(content) = &update.content {
548 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
549 project_id: ActiveValue::Set(project_id),
550 worktree_id: ActiveValue::Set(update.worktree_id as i64),
551 path: ActiveValue::Set(update.path.clone()),
552 content: ActiveValue::Set(content.clone()),
553 kind: ActiveValue::Set(kind),
554 })
555 .on_conflict(
556 OnConflict::columns([
557 worktree_settings_file::Column::ProjectId,
558 worktree_settings_file::Column::WorktreeId,
559 worktree_settings_file::Column::Path,
560 ])
561 .update_column(worktree_settings_file::Column::Content)
562 .to_owned(),
563 )
564 .exec(&*tx)
565 .await?;
566 } else {
567 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
568 project_id: ActiveValue::Set(project_id),
569 worktree_id: ActiveValue::Set(update.worktree_id as i64),
570 path: ActiveValue::Set(update.path.clone()),
571 ..Default::default()
572 })
573 .exec(&*tx)
574 .await?;
575 }
576
577 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
578 Ok(connection_ids)
579 })
580 .await
581 }
582
583 /// Adds the given connection to the specified hosted project
584 pub async fn join_hosted_project(
585 &self,
586 id: ProjectId,
587 user_id: UserId,
588 connection: ConnectionId,
589 ) -> Result<(Project, ReplicaId)> {
590 self.transaction(|tx| async move {
591 let (project, hosted_project) = project::Entity::find_by_id(id)
592 .find_also_related(hosted_project::Entity)
593 .one(&*tx)
594 .await?
595 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
596
597 let Some(hosted_project) = hosted_project else {
598 return Err(anyhow!("project is not hosted"))?;
599 };
600
601 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
602 .one(&*tx)
603 .await?
604 .ok_or_else(|| anyhow!("no such channel"))?;
605
606 let role = self
607 .check_user_is_channel_participant(&channel, user_id, &tx)
608 .await?;
609
610 self.join_project_internal(project, user_id, connection, role, &tx)
611 .await
612 })
613 .await
614 }
615
616 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
617 self.transaction(|tx| async move {
618 Ok(project::Entity::find_by_id(id)
619 .one(&*tx)
620 .await?
621 .ok_or_else(|| anyhow!("no such project"))?)
622 })
623 .await
624 }
625
626 pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
627 self.transaction(|tx| async move {
628 Ok(project::Entity::find()
629 .filter(project::Column::DevServerProjectId.eq(id))
630 .one(&*tx)
631 .await?
632 .ok_or_else(|| anyhow!("no such project"))?)
633 })
634 .await
635 }
636
637 /// Adds the given connection to the specified project
638 /// in the current room.
639 pub async fn join_project(
640 &self,
641 project_id: ProjectId,
642 connection: ConnectionId,
643 user_id: UserId,
644 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
645 self.project_transaction(project_id, |tx| async move {
646 let (project, role) = self
647 .access_project(
648 project_id,
649 connection,
650 PrincipalId::UserId(user_id),
651 Capability::ReadOnly,
652 &tx,
653 )
654 .await?;
655 self.join_project_internal(project, user_id, connection, role, &tx)
656 .await
657 })
658 .await
659 }
660
661 async fn join_project_internal(
662 &self,
663 project: project::Model,
664 user_id: UserId,
665 connection: ConnectionId,
666 role: ChannelRole,
667 tx: &DatabaseTransaction,
668 ) -> Result<(Project, ReplicaId)> {
669 let mut collaborators = project
670 .find_related(project_collaborator::Entity)
671 .all(tx)
672 .await?;
673 let replica_ids = collaborators
674 .iter()
675 .map(|c| c.replica_id)
676 .collect::<HashSet<_>>();
677 let mut replica_id = ReplicaId(1);
678 while replica_ids.contains(&replica_id) {
679 replica_id.0 += 1;
680 }
681 let new_collaborator = project_collaborator::ActiveModel {
682 project_id: ActiveValue::set(project.id),
683 connection_id: ActiveValue::set(connection.id as i32),
684 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
685 user_id: ActiveValue::set(user_id),
686 replica_id: ActiveValue::set(replica_id),
687 is_host: ActiveValue::set(false),
688 ..Default::default()
689 }
690 .insert(tx)
691 .await?;
692 collaborators.push(new_collaborator);
693
694 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
695 let mut worktrees = db_worktrees
696 .into_iter()
697 .map(|db_worktree| {
698 (
699 db_worktree.id as u64,
700 Worktree {
701 id: db_worktree.id as u64,
702 abs_path: db_worktree.abs_path,
703 root_name: db_worktree.root_name,
704 visible: db_worktree.visible,
705 entries: Default::default(),
706 repository_entries: Default::default(),
707 diagnostic_summaries: Default::default(),
708 settings_files: Default::default(),
709 scan_id: db_worktree.scan_id as u64,
710 completed_scan_id: db_worktree.completed_scan_id as u64,
711 },
712 )
713 })
714 .collect::<BTreeMap<_, _>>();
715
716 // Populate worktree entries.
717 {
718 let mut db_entries = worktree_entry::Entity::find()
719 .filter(
720 Condition::all()
721 .add(worktree_entry::Column::ProjectId.eq(project.id))
722 .add(worktree_entry::Column::IsDeleted.eq(false)),
723 )
724 .stream(tx)
725 .await?;
726 while let Some(db_entry) = db_entries.next().await {
727 let db_entry = db_entry?;
728 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
729 worktree.entries.push(proto::Entry {
730 id: db_entry.id as u64,
731 is_dir: db_entry.is_dir,
732 path: db_entry.path,
733 inode: db_entry.inode as u64,
734 mtime: Some(proto::Timestamp {
735 seconds: db_entry.mtime_seconds as u64,
736 nanos: db_entry.mtime_nanos as u32,
737 }),
738 is_symlink: db_entry.is_symlink,
739 is_ignored: db_entry.is_ignored,
740 is_external: db_entry.is_external,
741 git_status: db_entry.git_status.map(|status| status as i32),
742 // This is only used in the summarization backlog, so if it's None,
743 // that just means we won't be able to detect when to resummarize
744 // based on total number of backlogged bytes - instead, we'd go
745 // on number of files only. That shouldn't be a huge deal in practice.
746 size: None,
747 is_fifo: db_entry.is_fifo,
748 });
749 }
750 }
751 }
752
753 // Populate repository entries.
754 {
755 let mut db_repository_entries = worktree_repository::Entity::find()
756 .filter(
757 Condition::all()
758 .add(worktree_repository::Column::ProjectId.eq(project.id))
759 .add(worktree_repository::Column::IsDeleted.eq(false)),
760 )
761 .stream(tx)
762 .await?;
763 while let Some(db_repository_entry) = db_repository_entries.next().await {
764 let db_repository_entry = db_repository_entry?;
765 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
766 {
767 worktree.repository_entries.insert(
768 db_repository_entry.work_directory_id as u64,
769 proto::RepositoryEntry {
770 work_directory_id: db_repository_entry.work_directory_id as u64,
771 branch: db_repository_entry.branch,
772 },
773 );
774 }
775 }
776 }
777
778 // Populate worktree diagnostic summaries.
779 {
780 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
781 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
782 .stream(tx)
783 .await?;
784 while let Some(db_summary) = db_summaries.next().await {
785 let db_summary = db_summary?;
786 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
787 worktree
788 .diagnostic_summaries
789 .push(proto::DiagnosticSummary {
790 path: db_summary.path,
791 language_server_id: db_summary.language_server_id as u64,
792 error_count: db_summary.error_count as u32,
793 warning_count: db_summary.warning_count as u32,
794 });
795 }
796 }
797 }
798
799 // Populate worktree settings files
800 {
801 let mut db_settings_files = worktree_settings_file::Entity::find()
802 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
803 .stream(tx)
804 .await?;
805 while let Some(db_settings_file) = db_settings_files.next().await {
806 let db_settings_file = db_settings_file?;
807 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
808 worktree.settings_files.push(WorktreeSettingsFile {
809 path: db_settings_file.path,
810 content: db_settings_file.content,
811 kind: db_settings_file.kind,
812 });
813 }
814 }
815 }
816
817 // Populate language servers.
818 let language_servers = project
819 .find_related(language_server::Entity)
820 .all(tx)
821 .await?;
822
823 let project = Project {
824 id: project.id,
825 role,
826 collaborators: collaborators
827 .into_iter()
828 .map(|collaborator| ProjectCollaborator {
829 connection_id: collaborator.connection(),
830 user_id: collaborator.user_id,
831 replica_id: collaborator.replica_id,
832 is_host: collaborator.is_host,
833 })
834 .collect(),
835 worktrees,
836 language_servers: language_servers
837 .into_iter()
838 .map(|language_server| proto::LanguageServer {
839 id: language_server.id as u64,
840 name: language_server.name,
841 worktree_id: None,
842 })
843 .collect(),
844 dev_server_project_id: project.dev_server_project_id,
845 };
846 Ok((project, replica_id as ReplicaId))
847 }
848
849 pub async fn leave_hosted_project(
850 &self,
851 project_id: ProjectId,
852 connection: ConnectionId,
853 ) -> Result<LeftProject> {
854 self.transaction(|tx| async move {
855 let result = project_collaborator::Entity::delete_many()
856 .filter(
857 Condition::all()
858 .add(project_collaborator::Column::ProjectId.eq(project_id))
859 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
860 .add(
861 project_collaborator::Column::ConnectionServerId
862 .eq(connection.owner_id as i32),
863 ),
864 )
865 .exec(&*tx)
866 .await?;
867 if result.rows_affected == 0 {
868 return Err(anyhow!("not in the project"))?;
869 }
870
871 let project = project::Entity::find_by_id(project_id)
872 .one(&*tx)
873 .await?
874 .ok_or_else(|| anyhow!("no such project"))?;
875 let collaborators = project
876 .find_related(project_collaborator::Entity)
877 .all(&*tx)
878 .await?;
879 let connection_ids = collaborators
880 .into_iter()
881 .map(|collaborator| collaborator.connection())
882 .collect();
883 Ok(LeftProject {
884 id: project.id,
885 connection_ids,
886 should_unshare: false,
887 })
888 })
889 .await
890 }
891
892 /// Removes the given connection from the specified project.
893 pub async fn leave_project(
894 &self,
895 project_id: ProjectId,
896 connection: ConnectionId,
897 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
898 self.project_transaction(project_id, |tx| async move {
899 let result = project_collaborator::Entity::delete_many()
900 .filter(
901 Condition::all()
902 .add(project_collaborator::Column::ProjectId.eq(project_id))
903 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
904 .add(
905 project_collaborator::Column::ConnectionServerId
906 .eq(connection.owner_id as i32),
907 ),
908 )
909 .exec(&*tx)
910 .await?;
911 if result.rows_affected == 0 {
912 Err(anyhow!("not a collaborator on this project"))?;
913 }
914
915 let project = project::Entity::find_by_id(project_id)
916 .one(&*tx)
917 .await?
918 .ok_or_else(|| anyhow!("no such project"))?;
919 let collaborators = project
920 .find_related(project_collaborator::Entity)
921 .all(&*tx)
922 .await?;
923 let connection_ids: Vec<ConnectionId> = collaborators
924 .into_iter()
925 .map(|collaborator| collaborator.connection())
926 .collect();
927
928 follower::Entity::delete_many()
929 .filter(
930 Condition::any()
931 .add(
932 Condition::all()
933 .add(follower::Column::ProjectId.eq(Some(project_id)))
934 .add(
935 follower::Column::LeaderConnectionServerId
936 .eq(connection.owner_id),
937 )
938 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
939 )
940 .add(
941 Condition::all()
942 .add(follower::Column::ProjectId.eq(Some(project_id)))
943 .add(
944 follower::Column::FollowerConnectionServerId
945 .eq(connection.owner_id),
946 )
947 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
948 ),
949 )
950 .exec(&*tx)
951 .await?;
952
953 let room = if let Some(room_id) = project.room_id {
954 Some(self.get_room(room_id, &tx).await?)
955 } else {
956 None
957 };
958
959 let left_project = LeftProject {
960 id: project_id,
961 should_unshare: connection == project.host_connection()?,
962 connection_ids,
963 };
964 Ok((room, left_project))
965 })
966 .await
967 }
968
969 pub async fn check_user_is_project_host(
970 &self,
971 project_id: ProjectId,
972 connection_id: ConnectionId,
973 ) -> Result<()> {
974 self.project_transaction(project_id, |tx| async move {
975 project::Entity::find()
976 .filter(
977 Condition::all()
978 .add(project::Column::Id.eq(project_id))
979 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
980 .add(
981 project::Column::HostConnectionServerId
982 .eq(Some(connection_id.owner_id as i32)),
983 ),
984 )
985 .one(&*tx)
986 .await?
987 .ok_or_else(|| anyhow!("failed to read project host"))?;
988
989 Ok(())
990 })
991 .await
992 .map(|guard| guard.into_inner())
993 }
994
995 /// Returns the current project if the given user is authorized to access it with the specified capability.
996 pub async fn access_project(
997 &self,
998 project_id: ProjectId,
999 connection_id: ConnectionId,
1000 principal_id: PrincipalId,
1001 capability: Capability,
1002 tx: &DatabaseTransaction,
1003 ) -> Result<(project::Model, ChannelRole)> {
1004 let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
1005 .find_also_related(dev_server_project::Entity)
1006 .one(tx)
1007 .await?
1008 .ok_or_else(|| anyhow!("no such project"))?;
1009
1010 let user_id = match principal_id {
1011 PrincipalId::DevServerId(_) => {
1012 if project
1013 .host_connection()
1014 .is_ok_and(|connection| connection == connection_id)
1015 {
1016 return Ok((project, ChannelRole::Admin));
1017 }
1018 return Err(anyhow!("not the project host"))?;
1019 }
1020 PrincipalId::UserId(user_id) => user_id,
1021 };
1022
1023 let role_from_room = if let Some(room_id) = project.room_id {
1024 room_participant::Entity::find()
1025 .filter(room_participant::Column::RoomId.eq(room_id))
1026 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1027 .one(tx)
1028 .await?
1029 .and_then(|participant| participant.role)
1030 } else {
1031 None
1032 };
1033 let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1034 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1035 .one(tx)
1036 .await?
1037 .ok_or_else(|| anyhow!("no such channel"))?;
1038 if user_id == dev_server.user_id {
1039 // If the user left the room "uncleanly" they may rejoin the
1040 // remote project before leave_room runs. IN that case kick
1041 // the project out of the room pre-emptively.
1042 if role_from_room.is_none() {
1043 project = project::Entity::update(project::ActiveModel {
1044 room_id: ActiveValue::Set(None),
1045 ..project.into_active_model()
1046 })
1047 .exec(tx)
1048 .await?;
1049 }
1050 Some(ChannelRole::Admin)
1051 } else {
1052 None
1053 }
1054 } else {
1055 None
1056 };
1057
1058 let role = role_from_dev_server
1059 .or(role_from_room)
1060 .unwrap_or(ChannelRole::Banned);
1061
1062 match capability {
1063 Capability::ReadWrite => {
1064 if !role.can_edit_projects() {
1065 return Err(anyhow!("not authorized to edit projects"))?;
1066 }
1067 }
1068 Capability::ReadOnly => {
1069 if !role.can_read_projects() {
1070 return Err(anyhow!("not authorized to read projects"))?;
1071 }
1072 }
1073 }
1074
1075 Ok((project, role))
1076 }
1077
1078 /// Returns the host connection for a read-only request to join a shared project.
1079 pub async fn host_for_read_only_project_request(
1080 &self,
1081 project_id: ProjectId,
1082 connection_id: ConnectionId,
1083 user_id: UserId,
1084 ) -> Result<ConnectionId> {
1085 self.project_transaction(project_id, |tx| async move {
1086 let (project, _) = self
1087 .access_project(
1088 project_id,
1089 connection_id,
1090 PrincipalId::UserId(user_id),
1091 Capability::ReadOnly,
1092 &tx,
1093 )
1094 .await?;
1095 project.host_connection()
1096 })
1097 .await
1098 .map(|guard| guard.into_inner())
1099 }
1100
1101 /// Returns the host connection for a request to join a shared project.
1102 pub async fn host_for_mutating_project_request(
1103 &self,
1104 project_id: ProjectId,
1105 connection_id: ConnectionId,
1106 user_id: UserId,
1107 ) -> Result<ConnectionId> {
1108 self.project_transaction(project_id, |tx| async move {
1109 let (project, _) = self
1110 .access_project(
1111 project_id,
1112 connection_id,
1113 PrincipalId::UserId(user_id),
1114 Capability::ReadWrite,
1115 &tx,
1116 )
1117 .await?;
1118 project.host_connection()
1119 })
1120 .await
1121 .map(|guard| guard.into_inner())
1122 }
1123
1124 /// Returns the host connection for a request to join a shared project.
1125 pub async fn host_for_owner_project_request(
1126 &self,
1127 project_id: ProjectId,
1128 _connection_id: ConnectionId,
1129 user_id: UserId,
1130 ) -> Result<ConnectionId> {
1131 self.project_transaction(project_id, |tx| async move {
1132 let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1133 .find_also_related(dev_server_project::Entity)
1134 .one(&*tx)
1135 .await?
1136 .ok_or_else(|| anyhow!("no such project"))?;
1137
1138 let Some(dev_server_project) = dev_server_project else {
1139 return Err(anyhow!("not a dev server project"))?;
1140 };
1141 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1142 .one(&*tx)
1143 .await?
1144 .ok_or_else(|| anyhow!("no such dev server"))?;
1145 if dev_server.user_id != user_id {
1146 return Err(anyhow!("not your project"))?;
1147 }
1148 project.host_connection()
1149 })
1150 .await
1151 .map(|guard| guard.into_inner())
1152 }
1153
1154 pub async fn connections_for_buffer_update(
1155 &self,
1156 project_id: ProjectId,
1157 principal_id: PrincipalId,
1158 connection_id: ConnectionId,
1159 capability: Capability,
1160 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1161 self.project_transaction(project_id, |tx| async move {
1162 // Authorize
1163 let (project, _) = self
1164 .access_project(project_id, connection_id, principal_id, capability, &tx)
1165 .await?;
1166
1167 let host_connection_id = project.host_connection()?;
1168
1169 let collaborators = project_collaborator::Entity::find()
1170 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1171 .all(&*tx)
1172 .await?;
1173
1174 let guest_connection_ids = collaborators
1175 .into_iter()
1176 .filter_map(|collaborator| {
1177 if collaborator.is_host {
1178 None
1179 } else {
1180 Some(collaborator.connection())
1181 }
1182 })
1183 .collect();
1184
1185 Ok((host_connection_id, guest_connection_ids))
1186 })
1187 .await
1188 }
1189
1190 /// Returns the connection IDs in the given project.
1191 ///
1192 /// The provided `connection_id` must also be a collaborator in the project,
1193 /// otherwise an error will be returned.
1194 pub async fn project_connection_ids(
1195 &self,
1196 project_id: ProjectId,
1197 connection_id: ConnectionId,
1198 exclude_dev_server: bool,
1199 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1200 self.project_transaction(project_id, |tx| async move {
1201 let project = project::Entity::find_by_id(project_id)
1202 .one(&*tx)
1203 .await?
1204 .ok_or_else(|| anyhow!("no such project"))?;
1205
1206 let mut collaborators = project_collaborator::Entity::find()
1207 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1208 .stream(&*tx)
1209 .await?;
1210
1211 let mut connection_ids = HashSet::default();
1212 if let Some(host_connection) = project.host_connection().log_err() {
1213 if !exclude_dev_server {
1214 connection_ids.insert(host_connection);
1215 }
1216 }
1217
1218 while let Some(collaborator) = collaborators.next().await {
1219 let collaborator = collaborator?;
1220 connection_ids.insert(collaborator.connection());
1221 }
1222
1223 if connection_ids.contains(&connection_id)
1224 || Some(connection_id) == project.host_connection().ok()
1225 {
1226 Ok(connection_ids)
1227 } else {
1228 Err(anyhow!(
1229 "can only send project updates to a project you're in"
1230 ))?
1231 }
1232 })
1233 .await
1234 }
1235
1236 async fn project_guest_connection_ids(
1237 &self,
1238 project_id: ProjectId,
1239 tx: &DatabaseTransaction,
1240 ) -> Result<Vec<ConnectionId>> {
1241 let mut collaborators = project_collaborator::Entity::find()
1242 .filter(
1243 project_collaborator::Column::ProjectId
1244 .eq(project_id)
1245 .and(project_collaborator::Column::IsHost.eq(false)),
1246 )
1247 .stream(tx)
1248 .await?;
1249
1250 let mut guest_connection_ids = Vec::new();
1251 while let Some(collaborator) = collaborators.next().await {
1252 let collaborator = collaborator?;
1253 guest_connection_ids.push(collaborator.connection());
1254 }
1255 Ok(guest_connection_ids)
1256 }
1257
1258 /// Returns the [`RoomId`] for the given project.
1259 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1260 self.transaction(|tx| async move {
1261 Ok(project::Entity::find_by_id(project_id)
1262 .one(&*tx)
1263 .await?
1264 .and_then(|project| project.room_id))
1265 })
1266 .await
1267 }
1268
1269 pub async fn check_room_participants(
1270 &self,
1271 room_id: RoomId,
1272 leader_id: ConnectionId,
1273 follower_id: ConnectionId,
1274 ) -> Result<()> {
1275 self.transaction(|tx| async move {
1276 use room_participant::Column;
1277
1278 let count = room_participant::Entity::find()
1279 .filter(
1280 Condition::all().add(Column::RoomId.eq(room_id)).add(
1281 Condition::any()
1282 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1283 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1284 ))
1285 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1286 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1287 )),
1288 ),
1289 )
1290 .count(&*tx)
1291 .await?;
1292
1293 if count < 2 {
1294 Err(anyhow!("not room participants"))?;
1295 }
1296
1297 Ok(())
1298 })
1299 .await
1300 }
1301
1302 /// Adds the given follower connection as a follower of the given leader connection.
1303 pub async fn follow(
1304 &self,
1305 room_id: RoomId,
1306 project_id: ProjectId,
1307 leader_connection: ConnectionId,
1308 follower_connection: ConnectionId,
1309 ) -> Result<TransactionGuard<proto::Room>> {
1310 self.room_transaction(room_id, |tx| async move {
1311 follower::ActiveModel {
1312 room_id: ActiveValue::set(room_id),
1313 project_id: ActiveValue::set(project_id),
1314 leader_connection_server_id: ActiveValue::set(ServerId(
1315 leader_connection.owner_id as i32,
1316 )),
1317 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1318 follower_connection_server_id: ActiveValue::set(ServerId(
1319 follower_connection.owner_id as i32,
1320 )),
1321 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1322 ..Default::default()
1323 }
1324 .insert(&*tx)
1325 .await?;
1326
1327 let room = self.get_room(room_id, &tx).await?;
1328 Ok(room)
1329 })
1330 .await
1331 }
1332
1333 /// Removes the given follower connection as a follower of the given leader connection.
1334 pub async fn unfollow(
1335 &self,
1336 room_id: RoomId,
1337 project_id: ProjectId,
1338 leader_connection: ConnectionId,
1339 follower_connection: ConnectionId,
1340 ) -> Result<TransactionGuard<proto::Room>> {
1341 self.room_transaction(room_id, |tx| async move {
1342 follower::Entity::delete_many()
1343 .filter(
1344 Condition::all()
1345 .add(follower::Column::RoomId.eq(room_id))
1346 .add(follower::Column::ProjectId.eq(project_id))
1347 .add(
1348 follower::Column::LeaderConnectionServerId
1349 .eq(leader_connection.owner_id),
1350 )
1351 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1352 .add(
1353 follower::Column::FollowerConnectionServerId
1354 .eq(follower_connection.owner_id),
1355 )
1356 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1357 )
1358 .exec(&*tx)
1359 .await?;
1360
1361 let room = self.get_room(room_id, &tx).await?;
1362 Ok(room)
1363 })
1364 .await
1365 }
1366}