1use util::ResultExt;
2
3use super::*;
4
5impl Database {
6 /// Returns the count of all projects, excluding ones marked as admin.
7 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
8 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
9 enum QueryAs {
10 Count,
11 }
12
13 self.transaction(|tx| async move {
14 Ok(project::Entity::find()
15 .select_only()
16 .column_as(project::Column::Id.count(), QueryAs::Count)
17 .inner_join(user::Entity)
18 .filter(user::Column::Admin.eq(false))
19 .into_values::<_, QueryAs>()
20 .one(&*tx)
21 .await?
22 .unwrap_or(0i64) as usize)
23 })
24 .await
25 }
26
27 /// Shares a project with the given room.
28 pub async fn share_project(
29 &self,
30 room_id: RoomId,
31 connection: ConnectionId,
32 worktrees: &[proto::WorktreeMetadata],
33 dev_server_project_id: Option<DevServerProjectId>,
34 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
35 self.room_transaction(room_id, |tx| async move {
36 let participant = room_participant::Entity::find()
37 .filter(
38 Condition::all()
39 .add(
40 room_participant::Column::AnsweringConnectionId
41 .eq(connection.id as i32),
42 )
43 .add(
44 room_participant::Column::AnsweringConnectionServerId
45 .eq(connection.owner_id as i32),
46 ),
47 )
48 .one(&*tx)
49 .await?
50 .ok_or_else(|| anyhow!("could not find participant"))?;
51 if participant.room_id != room_id {
52 return Err(anyhow!("shared project on unexpected room"))?;
53 }
54 if !participant
55 .role
56 .unwrap_or(ChannelRole::Member)
57 .can_edit_projects()
58 {
59 return Err(anyhow!("guests cannot share projects"))?;
60 }
61
62 if let Some(dev_server_project_id) = dev_server_project_id {
63 let project = project::Entity::find()
64 .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
65 .one(&*tx)
66 .await?
67 .ok_or_else(|| anyhow!("no remote project"))?;
68
69 if project.room_id.is_some() {
70 return Err(anyhow!("project already shared"))?;
71 };
72
73 let project = project::Entity::update(project::ActiveModel {
74 room_id: ActiveValue::Set(Some(room_id)),
75 ..project.into_active_model()
76 })
77 .exec(&*tx)
78 .await?;
79
80 // todo! check user is a project-collaborator
81 let room = self.get_room(room_id, &tx).await?;
82 return Ok((project.id, room));
83 }
84
85 let project = project::ActiveModel {
86 room_id: ActiveValue::set(Some(participant.room_id)),
87 host_user_id: ActiveValue::set(Some(participant.user_id)),
88 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
89 host_connection_server_id: ActiveValue::set(Some(ServerId(
90 connection.owner_id as i32,
91 ))),
92 id: ActiveValue::NotSet,
93 hosted_project_id: ActiveValue::Set(None),
94 dev_server_project_id: ActiveValue::Set(None),
95 }
96 .insert(&*tx)
97 .await?;
98
99 if !worktrees.is_empty() {
100 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
101 worktree::ActiveModel {
102 id: ActiveValue::set(worktree.id as i64),
103 project_id: ActiveValue::set(project.id),
104 abs_path: ActiveValue::set(worktree.abs_path.clone()),
105 root_name: ActiveValue::set(worktree.root_name.clone()),
106 visible: ActiveValue::set(worktree.visible),
107 scan_id: ActiveValue::set(0),
108 completed_scan_id: ActiveValue::set(0),
109 }
110 }))
111 .exec(&*tx)
112 .await?;
113 }
114
115 project_collaborator::ActiveModel {
116 project_id: ActiveValue::set(project.id),
117 connection_id: ActiveValue::set(connection.id as i32),
118 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
119 user_id: ActiveValue::set(participant.user_id),
120 replica_id: ActiveValue::set(ReplicaId(0)),
121 is_host: ActiveValue::set(true),
122 ..Default::default()
123 }
124 .insert(&*tx)
125 .await?;
126
127 let room = self.get_room(room_id, &tx).await?;
128 Ok((project.id, room))
129 })
130 .await
131 }
132
133 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
134 self.weak_transaction(|tx| async move {
135 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
136 Ok(())
137 })
138 .await
139 }
140
141 /// Unshares the given project.
142 pub async fn unshare_project(
143 &self,
144 project_id: ProjectId,
145 connection: ConnectionId,
146 user_id: Option<UserId>,
147 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
148 self.project_transaction(project_id, |tx| async move {
149 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
150 let project = project::Entity::find_by_id(project_id)
151 .one(&*tx)
152 .await?
153 .ok_or_else(|| anyhow!("project not found"))?;
154 let room = if let Some(room_id) = project.room_id {
155 Some(self.get_room(room_id, &tx).await?)
156 } else {
157 None
158 };
159 if project.host_connection()? == connection {
160 return Ok((true, room, guest_connection_ids));
161 }
162 if let Some(dev_server_project_id) = project.dev_server_project_id {
163 if let Some(user_id) = user_id {
164 if user_id
165 != self
166 .owner_for_dev_server_project(dev_server_project_id, &tx)
167 .await?
168 {
169 Err(anyhow!("cannot unshare a project hosted by another user"))?
170 }
171 project::Entity::update(project::ActiveModel {
172 room_id: ActiveValue::Set(None),
173 ..project.into_active_model()
174 })
175 .exec(&*tx)
176 .await?;
177 return Ok((false, room, guest_connection_ids));
178 }
179 }
180
181 Err(anyhow!("cannot unshare a project hosted by another user"))?
182 })
183 .await
184 }
185
186 /// Updates the worktrees associated with the given project.
187 pub async fn update_project(
188 &self,
189 project_id: ProjectId,
190 connection: ConnectionId,
191 worktrees: &[proto::WorktreeMetadata],
192 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
193 self.project_transaction(project_id, |tx| async move {
194 let project = project::Entity::find_by_id(project_id)
195 .filter(
196 Condition::all()
197 .add(project::Column::HostConnectionId.eq(connection.id as i32))
198 .add(
199 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
200 ),
201 )
202 .one(&*tx)
203 .await?
204 .ok_or_else(|| anyhow!("no such project"))?;
205
206 self.update_project_worktrees(project.id, worktrees, &tx)
207 .await?;
208
209 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
210
211 let room = if let Some(room_id) = project.room_id {
212 Some(self.get_room(room_id, &tx).await?)
213 } else {
214 None
215 };
216
217 Ok((room, guest_connection_ids))
218 })
219 .await
220 }
221
222 pub(in crate::db) async fn update_project_worktrees(
223 &self,
224 project_id: ProjectId,
225 worktrees: &[proto::WorktreeMetadata],
226 tx: &DatabaseTransaction,
227 ) -> Result<()> {
228 if !worktrees.is_empty() {
229 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
230 id: ActiveValue::set(worktree.id as i64),
231 project_id: ActiveValue::set(project_id),
232 abs_path: ActiveValue::set(worktree.abs_path.clone()),
233 root_name: ActiveValue::set(worktree.root_name.clone()),
234 visible: ActiveValue::set(worktree.visible),
235 scan_id: ActiveValue::set(0),
236 completed_scan_id: ActiveValue::set(0),
237 }))
238 .on_conflict(
239 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
240 .update_column(worktree::Column::RootName)
241 .to_owned(),
242 )
243 .exec(tx)
244 .await?;
245 }
246
247 worktree::Entity::delete_many()
248 .filter(worktree::Column::ProjectId.eq(project_id).and(
249 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
250 ))
251 .exec(tx)
252 .await?;
253
254 Ok(())
255 }
256
257 pub async fn update_worktree(
258 &self,
259 update: &proto::UpdateWorktree,
260 connection: ConnectionId,
261 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
262 let project_id = ProjectId::from_proto(update.project_id);
263 let worktree_id = update.worktree_id as i64;
264 self.project_transaction(project_id, |tx| async move {
265 // Ensure the update comes from the host.
266 let _project = project::Entity::find_by_id(project_id)
267 .filter(
268 Condition::all()
269 .add(project::Column::HostConnectionId.eq(connection.id as i32))
270 .add(
271 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
272 ),
273 )
274 .one(&*tx)
275 .await?
276 .ok_or_else(|| anyhow!("no such project"))?;
277
278 // Update metadata.
279 worktree::Entity::update(worktree::ActiveModel {
280 id: ActiveValue::set(worktree_id),
281 project_id: ActiveValue::set(project_id),
282 root_name: ActiveValue::set(update.root_name.clone()),
283 scan_id: ActiveValue::set(update.scan_id as i64),
284 completed_scan_id: if update.is_last_update {
285 ActiveValue::set(update.scan_id as i64)
286 } else {
287 ActiveValue::default()
288 },
289 abs_path: ActiveValue::set(update.abs_path.clone()),
290 ..Default::default()
291 })
292 .exec(&*tx)
293 .await?;
294
295 if !update.updated_entries.is_empty() {
296 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
297 let mtime = entry.mtime.clone().unwrap_or_default();
298 worktree_entry::ActiveModel {
299 project_id: ActiveValue::set(project_id),
300 worktree_id: ActiveValue::set(worktree_id),
301 id: ActiveValue::set(entry.id as i64),
302 is_dir: ActiveValue::set(entry.is_dir),
303 path: ActiveValue::set(entry.path.clone()),
304 inode: ActiveValue::set(entry.inode as i64),
305 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
306 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
307 is_symlink: ActiveValue::set(entry.is_symlink),
308 is_ignored: ActiveValue::set(entry.is_ignored),
309 is_external: ActiveValue::set(entry.is_external),
310 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
311 is_deleted: ActiveValue::set(false),
312 scan_id: ActiveValue::set(update.scan_id as i64),
313 }
314 }))
315 .on_conflict(
316 OnConflict::columns([
317 worktree_entry::Column::ProjectId,
318 worktree_entry::Column::WorktreeId,
319 worktree_entry::Column::Id,
320 ])
321 .update_columns([
322 worktree_entry::Column::IsDir,
323 worktree_entry::Column::Path,
324 worktree_entry::Column::Inode,
325 worktree_entry::Column::MtimeSeconds,
326 worktree_entry::Column::MtimeNanos,
327 worktree_entry::Column::IsSymlink,
328 worktree_entry::Column::IsIgnored,
329 worktree_entry::Column::GitStatus,
330 worktree_entry::Column::ScanId,
331 ])
332 .to_owned(),
333 )
334 .exec(&*tx)
335 .await?;
336 }
337
338 if !update.removed_entries.is_empty() {
339 worktree_entry::Entity::update_many()
340 .filter(
341 worktree_entry::Column::ProjectId
342 .eq(project_id)
343 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
344 .and(
345 worktree_entry::Column::Id
346 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
347 ),
348 )
349 .set(worktree_entry::ActiveModel {
350 is_deleted: ActiveValue::Set(true),
351 scan_id: ActiveValue::Set(update.scan_id as i64),
352 ..Default::default()
353 })
354 .exec(&*tx)
355 .await?;
356 }
357
358 if !update.updated_repositories.is_empty() {
359 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
360 |repository| worktree_repository::ActiveModel {
361 project_id: ActiveValue::set(project_id),
362 worktree_id: ActiveValue::set(worktree_id),
363 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
364 scan_id: ActiveValue::set(update.scan_id as i64),
365 branch: ActiveValue::set(repository.branch.clone()),
366 is_deleted: ActiveValue::set(false),
367 },
368 ))
369 .on_conflict(
370 OnConflict::columns([
371 worktree_repository::Column::ProjectId,
372 worktree_repository::Column::WorktreeId,
373 worktree_repository::Column::WorkDirectoryId,
374 ])
375 .update_columns([
376 worktree_repository::Column::ScanId,
377 worktree_repository::Column::Branch,
378 ])
379 .to_owned(),
380 )
381 .exec(&*tx)
382 .await?;
383 }
384
385 if !update.removed_repositories.is_empty() {
386 worktree_repository::Entity::update_many()
387 .filter(
388 worktree_repository::Column::ProjectId
389 .eq(project_id)
390 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
391 .and(
392 worktree_repository::Column::WorkDirectoryId
393 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
394 ),
395 )
396 .set(worktree_repository::ActiveModel {
397 is_deleted: ActiveValue::Set(true),
398 scan_id: ActiveValue::Set(update.scan_id as i64),
399 ..Default::default()
400 })
401 .exec(&*tx)
402 .await?;
403 }
404
405 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
406 Ok(connection_ids)
407 })
408 .await
409 }
410
411 /// Updates the diagnostic summary for the given connection.
412 pub async fn update_diagnostic_summary(
413 &self,
414 update: &proto::UpdateDiagnosticSummary,
415 connection: ConnectionId,
416 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
417 let project_id = ProjectId::from_proto(update.project_id);
418 let worktree_id = update.worktree_id as i64;
419 self.project_transaction(project_id, |tx| async move {
420 let summary = update
421 .summary
422 .as_ref()
423 .ok_or_else(|| anyhow!("invalid summary"))?;
424
425 // Ensure the update comes from the host.
426 let project = project::Entity::find_by_id(project_id)
427 .one(&*tx)
428 .await?
429 .ok_or_else(|| anyhow!("no such project"))?;
430 if project.host_connection()? != connection {
431 return Err(anyhow!("can't update a project hosted by someone else"))?;
432 }
433
434 // Update summary.
435 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
436 project_id: ActiveValue::set(project_id),
437 worktree_id: ActiveValue::set(worktree_id),
438 path: ActiveValue::set(summary.path.clone()),
439 language_server_id: ActiveValue::set(summary.language_server_id as i64),
440 error_count: ActiveValue::set(summary.error_count as i32),
441 warning_count: ActiveValue::set(summary.warning_count as i32),
442 })
443 .on_conflict(
444 OnConflict::columns([
445 worktree_diagnostic_summary::Column::ProjectId,
446 worktree_diagnostic_summary::Column::WorktreeId,
447 worktree_diagnostic_summary::Column::Path,
448 ])
449 .update_columns([
450 worktree_diagnostic_summary::Column::LanguageServerId,
451 worktree_diagnostic_summary::Column::ErrorCount,
452 worktree_diagnostic_summary::Column::WarningCount,
453 ])
454 .to_owned(),
455 )
456 .exec(&*tx)
457 .await?;
458
459 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
460 Ok(connection_ids)
461 })
462 .await
463 }
464
465 /// Starts the language server for the given connection.
466 pub async fn start_language_server(
467 &self,
468 update: &proto::StartLanguageServer,
469 connection: ConnectionId,
470 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
471 let project_id = ProjectId::from_proto(update.project_id);
472 self.project_transaction(project_id, |tx| async move {
473 let server = update
474 .server
475 .as_ref()
476 .ok_or_else(|| anyhow!("invalid language server"))?;
477
478 // Ensure the update comes from the host.
479 let project = project::Entity::find_by_id(project_id)
480 .one(&*tx)
481 .await?
482 .ok_or_else(|| anyhow!("no such project"))?;
483 if project.host_connection()? != connection {
484 return Err(anyhow!("can't update a project hosted by someone else"))?;
485 }
486
487 // Add the newly-started language server.
488 language_server::Entity::insert(language_server::ActiveModel {
489 project_id: ActiveValue::set(project_id),
490 id: ActiveValue::set(server.id as i64),
491 name: ActiveValue::set(server.name.clone()),
492 })
493 .on_conflict(
494 OnConflict::columns([
495 language_server::Column::ProjectId,
496 language_server::Column::Id,
497 ])
498 .update_column(language_server::Column::Name)
499 .to_owned(),
500 )
501 .exec(&*tx)
502 .await?;
503
504 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
505 Ok(connection_ids)
506 })
507 .await
508 }
509
510 /// Updates the worktree settings for the given connection.
511 pub async fn update_worktree_settings(
512 &self,
513 update: &proto::UpdateWorktreeSettings,
514 connection: ConnectionId,
515 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
516 let project_id = ProjectId::from_proto(update.project_id);
517 self.project_transaction(project_id, |tx| async move {
518 // Ensure the update comes from the host.
519 let project = project::Entity::find_by_id(project_id)
520 .one(&*tx)
521 .await?
522 .ok_or_else(|| anyhow!("no such project"))?;
523 if project.host_connection()? != connection {
524 return Err(anyhow!("can't update a project hosted by someone else"))?;
525 }
526
527 if let Some(content) = &update.content {
528 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
529 project_id: ActiveValue::Set(project_id),
530 worktree_id: ActiveValue::Set(update.worktree_id as i64),
531 path: ActiveValue::Set(update.path.clone()),
532 content: ActiveValue::Set(content.clone()),
533 })
534 .on_conflict(
535 OnConflict::columns([
536 worktree_settings_file::Column::ProjectId,
537 worktree_settings_file::Column::WorktreeId,
538 worktree_settings_file::Column::Path,
539 ])
540 .update_column(worktree_settings_file::Column::Content)
541 .to_owned(),
542 )
543 .exec(&*tx)
544 .await?;
545 } else {
546 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
547 project_id: ActiveValue::Set(project_id),
548 worktree_id: ActiveValue::Set(update.worktree_id as i64),
549 path: ActiveValue::Set(update.path.clone()),
550 ..Default::default()
551 })
552 .exec(&*tx)
553 .await?;
554 }
555
556 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
557 Ok(connection_ids)
558 })
559 .await
560 }
561
562 /// Adds the given connection to the specified hosted project
563 pub async fn join_hosted_project(
564 &self,
565 id: ProjectId,
566 user_id: UserId,
567 connection: ConnectionId,
568 ) -> Result<(Project, ReplicaId)> {
569 self.transaction(|tx| async move {
570 let (project, hosted_project) = project::Entity::find_by_id(id)
571 .find_also_related(hosted_project::Entity)
572 .one(&*tx)
573 .await?
574 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
575
576 let Some(hosted_project) = hosted_project else {
577 return Err(anyhow!("project is not hosted"))?;
578 };
579
580 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
581 .one(&*tx)
582 .await?
583 .ok_or_else(|| anyhow!("no such channel"))?;
584
585 let role = self
586 .check_user_is_channel_participant(&channel, user_id, &tx)
587 .await?;
588
589 self.join_project_internal(project, user_id, connection, role, &tx)
590 .await
591 })
592 .await
593 }
594
595 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
596 self.transaction(|tx| async move {
597 Ok(project::Entity::find_by_id(id)
598 .one(&*tx)
599 .await?
600 .ok_or_else(|| anyhow!("no such project"))?)
601 })
602 .await
603 }
604
605 pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
606 self.transaction(|tx| async move {
607 Ok(project::Entity::find()
608 .filter(project::Column::DevServerProjectId.eq(id))
609 .one(&*tx)
610 .await?
611 .ok_or_else(|| anyhow!("no such project"))?)
612 })
613 .await
614 }
615
616 /// Adds the given connection to the specified project
617 /// in the current room.
618 pub async fn join_project(
619 &self,
620 project_id: ProjectId,
621 connection: ConnectionId,
622 user_id: UserId,
623 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
624 self.project_transaction(project_id, |tx| async move {
625 let (project, role) = self
626 .access_project(
627 project_id,
628 connection,
629 PrincipalId::UserId(user_id),
630 Capability::ReadOnly,
631 &tx,
632 )
633 .await?;
634 self.join_project_internal(project, user_id, connection, role, &tx)
635 .await
636 })
637 .await
638 }
639
640 async fn join_project_internal(
641 &self,
642 project: project::Model,
643 user_id: UserId,
644 connection: ConnectionId,
645 role: ChannelRole,
646 tx: &DatabaseTransaction,
647 ) -> Result<(Project, ReplicaId)> {
648 let mut collaborators = project
649 .find_related(project_collaborator::Entity)
650 .all(tx)
651 .await?;
652 let replica_ids = collaborators
653 .iter()
654 .map(|c| c.replica_id)
655 .collect::<HashSet<_>>();
656 let mut replica_id = ReplicaId(1);
657 while replica_ids.contains(&replica_id) {
658 replica_id.0 += 1;
659 }
660 let new_collaborator = project_collaborator::ActiveModel {
661 project_id: ActiveValue::set(project.id),
662 connection_id: ActiveValue::set(connection.id as i32),
663 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
664 user_id: ActiveValue::set(user_id),
665 replica_id: ActiveValue::set(replica_id),
666 is_host: ActiveValue::set(false),
667 ..Default::default()
668 }
669 .insert(tx)
670 .await?;
671 collaborators.push(new_collaborator);
672
673 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
674 let mut worktrees = db_worktrees
675 .into_iter()
676 .map(|db_worktree| {
677 (
678 db_worktree.id as u64,
679 Worktree {
680 id: db_worktree.id as u64,
681 abs_path: db_worktree.abs_path,
682 root_name: db_worktree.root_name,
683 visible: db_worktree.visible,
684 entries: Default::default(),
685 repository_entries: Default::default(),
686 diagnostic_summaries: Default::default(),
687 settings_files: Default::default(),
688 scan_id: db_worktree.scan_id as u64,
689 completed_scan_id: db_worktree.completed_scan_id as u64,
690 },
691 )
692 })
693 .collect::<BTreeMap<_, _>>();
694
695 // Populate worktree entries.
696 {
697 let mut db_entries = worktree_entry::Entity::find()
698 .filter(
699 Condition::all()
700 .add(worktree_entry::Column::ProjectId.eq(project.id))
701 .add(worktree_entry::Column::IsDeleted.eq(false)),
702 )
703 .stream(tx)
704 .await?;
705 while let Some(db_entry) = db_entries.next().await {
706 let db_entry = db_entry?;
707 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
708 worktree.entries.push(proto::Entry {
709 id: db_entry.id as u64,
710 is_dir: db_entry.is_dir,
711 path: db_entry.path,
712 inode: db_entry.inode as u64,
713 mtime: Some(proto::Timestamp {
714 seconds: db_entry.mtime_seconds as u64,
715 nanos: db_entry.mtime_nanos as u32,
716 }),
717 is_symlink: db_entry.is_symlink,
718 is_ignored: db_entry.is_ignored,
719 is_external: db_entry.is_external,
720 git_status: db_entry.git_status.map(|status| status as i32),
721 });
722 }
723 }
724 }
725
726 // Populate repository entries.
727 {
728 let mut db_repository_entries = worktree_repository::Entity::find()
729 .filter(
730 Condition::all()
731 .add(worktree_repository::Column::ProjectId.eq(project.id))
732 .add(worktree_repository::Column::IsDeleted.eq(false)),
733 )
734 .stream(tx)
735 .await?;
736 while let Some(db_repository_entry) = db_repository_entries.next().await {
737 let db_repository_entry = db_repository_entry?;
738 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
739 {
740 worktree.repository_entries.insert(
741 db_repository_entry.work_directory_id as u64,
742 proto::RepositoryEntry {
743 work_directory_id: db_repository_entry.work_directory_id as u64,
744 branch: db_repository_entry.branch,
745 },
746 );
747 }
748 }
749 }
750
751 // Populate worktree diagnostic summaries.
752 {
753 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
754 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
755 .stream(tx)
756 .await?;
757 while let Some(db_summary) = db_summaries.next().await {
758 let db_summary = db_summary?;
759 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
760 worktree
761 .diagnostic_summaries
762 .push(proto::DiagnosticSummary {
763 path: db_summary.path,
764 language_server_id: db_summary.language_server_id as u64,
765 error_count: db_summary.error_count as u32,
766 warning_count: db_summary.warning_count as u32,
767 });
768 }
769 }
770 }
771
772 // Populate worktree settings files
773 {
774 let mut db_settings_files = worktree_settings_file::Entity::find()
775 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
776 .stream(tx)
777 .await?;
778 while let Some(db_settings_file) = db_settings_files.next().await {
779 let db_settings_file = db_settings_file?;
780 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
781 worktree.settings_files.push(WorktreeSettingsFile {
782 path: db_settings_file.path,
783 content: db_settings_file.content,
784 });
785 }
786 }
787 }
788
789 // Populate language servers.
790 let language_servers = project
791 .find_related(language_server::Entity)
792 .all(tx)
793 .await?;
794
795 let project = Project {
796 id: project.id,
797 role,
798 collaborators: collaborators
799 .into_iter()
800 .map(|collaborator| ProjectCollaborator {
801 connection_id: collaborator.connection(),
802 user_id: collaborator.user_id,
803 replica_id: collaborator.replica_id,
804 is_host: collaborator.is_host,
805 })
806 .collect(),
807 worktrees,
808 language_servers: language_servers
809 .into_iter()
810 .map(|language_server| proto::LanguageServer {
811 id: language_server.id as u64,
812 name: language_server.name,
813 })
814 .collect(),
815 dev_server_project_id: project.dev_server_project_id,
816 };
817 Ok((project, replica_id as ReplicaId))
818 }
819
820 pub async fn leave_hosted_project(
821 &self,
822 project_id: ProjectId,
823 connection: ConnectionId,
824 ) -> Result<LeftProject> {
825 self.transaction(|tx| async move {
826 let result = project_collaborator::Entity::delete_many()
827 .filter(
828 Condition::all()
829 .add(project_collaborator::Column::ProjectId.eq(project_id))
830 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
831 .add(
832 project_collaborator::Column::ConnectionServerId
833 .eq(connection.owner_id as i32),
834 ),
835 )
836 .exec(&*tx)
837 .await?;
838 if result.rows_affected == 0 {
839 return Err(anyhow!("not in the project"))?;
840 }
841
842 let project = project::Entity::find_by_id(project_id)
843 .one(&*tx)
844 .await?
845 .ok_or_else(|| anyhow!("no such project"))?;
846 let collaborators = project
847 .find_related(project_collaborator::Entity)
848 .all(&*tx)
849 .await?;
850 let connection_ids = collaborators
851 .into_iter()
852 .map(|collaborator| collaborator.connection())
853 .collect();
854 Ok(LeftProject {
855 id: project.id,
856 connection_ids,
857 should_unshare: false,
858 })
859 })
860 .await
861 }
862
863 /// Removes the given connection from the specified project.
864 pub async fn leave_project(
865 &self,
866 project_id: ProjectId,
867 connection: ConnectionId,
868 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
869 self.project_transaction(project_id, |tx| async move {
870 let result = project_collaborator::Entity::delete_many()
871 .filter(
872 Condition::all()
873 .add(project_collaborator::Column::ProjectId.eq(project_id))
874 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
875 .add(
876 project_collaborator::Column::ConnectionServerId
877 .eq(connection.owner_id as i32),
878 ),
879 )
880 .exec(&*tx)
881 .await?;
882 if result.rows_affected == 0 {
883 Err(anyhow!("not a collaborator on this project"))?;
884 }
885
886 let project = project::Entity::find_by_id(project_id)
887 .one(&*tx)
888 .await?
889 .ok_or_else(|| anyhow!("no such project"))?;
890 let collaborators = project
891 .find_related(project_collaborator::Entity)
892 .all(&*tx)
893 .await?;
894 let connection_ids: Vec<ConnectionId> = collaborators
895 .into_iter()
896 .map(|collaborator| collaborator.connection())
897 .collect();
898
899 follower::Entity::delete_many()
900 .filter(
901 Condition::any()
902 .add(
903 Condition::all()
904 .add(follower::Column::ProjectId.eq(Some(project_id)))
905 .add(
906 follower::Column::LeaderConnectionServerId
907 .eq(connection.owner_id),
908 )
909 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
910 )
911 .add(
912 Condition::all()
913 .add(follower::Column::ProjectId.eq(Some(project_id)))
914 .add(
915 follower::Column::FollowerConnectionServerId
916 .eq(connection.owner_id),
917 )
918 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
919 ),
920 )
921 .exec(&*tx)
922 .await?;
923
924 let room = if let Some(room_id) = project.room_id {
925 Some(self.get_room(room_id, &tx).await?)
926 } else {
927 None
928 };
929
930 let left_project = LeftProject {
931 id: project_id,
932 should_unshare: connection == project.host_connection()?,
933 connection_ids,
934 };
935 Ok((room, left_project))
936 })
937 .await
938 }
939
940 pub async fn check_user_is_project_host(
941 &self,
942 project_id: ProjectId,
943 connection_id: ConnectionId,
944 ) -> Result<()> {
945 self.project_transaction(project_id, |tx| async move {
946 project::Entity::find()
947 .filter(
948 Condition::all()
949 .add(project::Column::Id.eq(project_id))
950 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
951 .add(
952 project::Column::HostConnectionServerId
953 .eq(Some(connection_id.owner_id as i32)),
954 ),
955 )
956 .one(&*tx)
957 .await?
958 .ok_or_else(|| anyhow!("failed to read project host"))?;
959
960 Ok(())
961 })
962 .await
963 .map(|guard| guard.into_inner())
964 }
965
966 /// Returns the current project if the given user is authorized to access it with the specified capability.
967 pub async fn access_project(
968 &self,
969 project_id: ProjectId,
970 connection_id: ConnectionId,
971 principal_id: PrincipalId,
972 capability: Capability,
973 tx: &DatabaseTransaction,
974 ) -> Result<(project::Model, ChannelRole)> {
975 let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
976 .find_also_related(dev_server_project::Entity)
977 .one(tx)
978 .await?
979 .ok_or_else(|| anyhow!("no such project"))?;
980
981 let user_id = match principal_id {
982 PrincipalId::DevServerId(_) => {
983 if project
984 .host_connection()
985 .is_ok_and(|connection| connection == connection_id)
986 {
987 return Ok((project, ChannelRole::Admin));
988 }
989 return Err(anyhow!("not the project host"))?;
990 }
991 PrincipalId::UserId(user_id) => user_id,
992 };
993
994 let role_from_room = if let Some(room_id) = project.room_id {
995 room_participant::Entity::find()
996 .filter(room_participant::Column::RoomId.eq(room_id))
997 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
998 .one(tx)
999 .await?
1000 .and_then(|participant| participant.role)
1001 } else {
1002 None
1003 };
1004 let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1005 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1006 .one(tx)
1007 .await?
1008 .ok_or_else(|| anyhow!("no such channel"))?;
1009 if user_id == dev_server.user_id {
1010 // If the user left the room "uncleanly" they may rejoin the
1011 // remote project before leave_room runs. IN that case kick
1012 // the project out of the room pre-emptively.
1013 if role_from_room.is_none() {
1014 project = project::Entity::update(project::ActiveModel {
1015 room_id: ActiveValue::Set(None),
1016 ..project.into_active_model()
1017 })
1018 .exec(tx)
1019 .await?;
1020 }
1021 Some(ChannelRole::Admin)
1022 } else {
1023 None
1024 }
1025 } else {
1026 None
1027 };
1028
1029 let role = role_from_dev_server
1030 .or(role_from_room)
1031 .unwrap_or(ChannelRole::Banned);
1032
1033 match capability {
1034 Capability::ReadWrite => {
1035 if !role.can_edit_projects() {
1036 return Err(anyhow!("not authorized to edit projects"))?;
1037 }
1038 }
1039 Capability::ReadOnly => {
1040 if !role.can_read_projects() {
1041 return Err(anyhow!("not authorized to read projects"))?;
1042 }
1043 }
1044 }
1045
1046 Ok((project, role))
1047 }
1048
1049 /// Returns the host connection for a read-only request to join a shared project.
1050 pub async fn host_for_read_only_project_request(
1051 &self,
1052 project_id: ProjectId,
1053 connection_id: ConnectionId,
1054 user_id: UserId,
1055 ) -> Result<ConnectionId> {
1056 self.project_transaction(project_id, |tx| async move {
1057 let (project, _) = self
1058 .access_project(
1059 project_id,
1060 connection_id,
1061 PrincipalId::UserId(user_id),
1062 Capability::ReadOnly,
1063 &tx,
1064 )
1065 .await?;
1066 project.host_connection()
1067 })
1068 .await
1069 .map(|guard| guard.into_inner())
1070 }
1071
1072 /// Returns the host connection for a request to join a shared project.
1073 pub async fn host_for_mutating_project_request(
1074 &self,
1075 project_id: ProjectId,
1076 connection_id: ConnectionId,
1077 user_id: UserId,
1078 ) -> Result<ConnectionId> {
1079 self.project_transaction(project_id, |tx| async move {
1080 let (project, _) = self
1081 .access_project(
1082 project_id,
1083 connection_id,
1084 PrincipalId::UserId(user_id),
1085 Capability::ReadWrite,
1086 &tx,
1087 )
1088 .await?;
1089 project.host_connection()
1090 })
1091 .await
1092 .map(|guard| guard.into_inner())
1093 }
1094
1095 pub async fn connections_for_buffer_update(
1096 &self,
1097 project_id: ProjectId,
1098 principal_id: PrincipalId,
1099 connection_id: ConnectionId,
1100 capability: Capability,
1101 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1102 self.project_transaction(project_id, |tx| async move {
1103 // Authorize
1104 let (project, _) = self
1105 .access_project(project_id, connection_id, principal_id, capability, &tx)
1106 .await?;
1107
1108 let host_connection_id = project.host_connection()?;
1109
1110 let collaborators = project_collaborator::Entity::find()
1111 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1112 .all(&*tx)
1113 .await?;
1114
1115 let guest_connection_ids = collaborators
1116 .into_iter()
1117 .filter_map(|collaborator| {
1118 if collaborator.is_host {
1119 None
1120 } else {
1121 Some(collaborator.connection())
1122 }
1123 })
1124 .collect();
1125
1126 Ok((host_connection_id, guest_connection_ids))
1127 })
1128 .await
1129 }
1130
1131 /// Returns the connection IDs in the given project.
1132 ///
1133 /// The provided `connection_id` must also be a collaborator in the project,
1134 /// otherwise an error will be returned.
1135 pub async fn project_connection_ids(
1136 &self,
1137 project_id: ProjectId,
1138 connection_id: ConnectionId,
1139 exclude_dev_server: bool,
1140 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1141 self.project_transaction(project_id, |tx| async move {
1142 let project = project::Entity::find_by_id(project_id)
1143 .one(&*tx)
1144 .await?
1145 .ok_or_else(|| anyhow!("no such project"))?;
1146
1147 let mut collaborators = project_collaborator::Entity::find()
1148 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1149 .stream(&*tx)
1150 .await?;
1151
1152 let mut connection_ids = HashSet::default();
1153 if let Some(host_connection) = project.host_connection().log_err() {
1154 if !exclude_dev_server {
1155 connection_ids.insert(host_connection);
1156 }
1157 }
1158
1159 while let Some(collaborator) = collaborators.next().await {
1160 let collaborator = collaborator?;
1161 connection_ids.insert(collaborator.connection());
1162 }
1163
1164 if connection_ids.contains(&connection_id)
1165 || Some(connection_id) == project.host_connection().ok()
1166 {
1167 Ok(connection_ids)
1168 } else {
1169 Err(anyhow!(
1170 "can only send project updates to a project you're in"
1171 ))?
1172 }
1173 })
1174 .await
1175 }
1176
1177 async fn project_guest_connection_ids(
1178 &self,
1179 project_id: ProjectId,
1180 tx: &DatabaseTransaction,
1181 ) -> Result<Vec<ConnectionId>> {
1182 let mut collaborators = project_collaborator::Entity::find()
1183 .filter(
1184 project_collaborator::Column::ProjectId
1185 .eq(project_id)
1186 .and(project_collaborator::Column::IsHost.eq(false)),
1187 )
1188 .stream(tx)
1189 .await?;
1190
1191 let mut guest_connection_ids = Vec::new();
1192 while let Some(collaborator) = collaborators.next().await {
1193 let collaborator = collaborator?;
1194 guest_connection_ids.push(collaborator.connection());
1195 }
1196 Ok(guest_connection_ids)
1197 }
1198
1199 /// Returns the [`RoomId`] for the given project.
1200 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1201 self.transaction(|tx| async move {
1202 Ok(project::Entity::find_by_id(project_id)
1203 .one(&*tx)
1204 .await?
1205 .and_then(|project| project.room_id))
1206 })
1207 .await
1208 }
1209
1210 pub async fn check_room_participants(
1211 &self,
1212 room_id: RoomId,
1213 leader_id: ConnectionId,
1214 follower_id: ConnectionId,
1215 ) -> Result<()> {
1216 self.transaction(|tx| async move {
1217 use room_participant::Column;
1218
1219 let count = room_participant::Entity::find()
1220 .filter(
1221 Condition::all().add(Column::RoomId.eq(room_id)).add(
1222 Condition::any()
1223 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1224 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1225 ))
1226 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1227 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1228 )),
1229 ),
1230 )
1231 .count(&*tx)
1232 .await?;
1233
1234 if count < 2 {
1235 Err(anyhow!("not room participants"))?;
1236 }
1237
1238 Ok(())
1239 })
1240 .await
1241 }
1242
1243 /// Adds the given follower connection as a follower of the given leader connection.
1244 pub async fn follow(
1245 &self,
1246 room_id: RoomId,
1247 project_id: ProjectId,
1248 leader_connection: ConnectionId,
1249 follower_connection: ConnectionId,
1250 ) -> Result<TransactionGuard<proto::Room>> {
1251 self.room_transaction(room_id, |tx| async move {
1252 follower::ActiveModel {
1253 room_id: ActiveValue::set(room_id),
1254 project_id: ActiveValue::set(project_id),
1255 leader_connection_server_id: ActiveValue::set(ServerId(
1256 leader_connection.owner_id as i32,
1257 )),
1258 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1259 follower_connection_server_id: ActiveValue::set(ServerId(
1260 follower_connection.owner_id as i32,
1261 )),
1262 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1263 ..Default::default()
1264 }
1265 .insert(&*tx)
1266 .await?;
1267
1268 let room = self.get_room(room_id, &tx).await?;
1269 Ok(room)
1270 })
1271 .await
1272 }
1273
1274 /// Removes the given follower connection as a follower of the given leader connection.
1275 pub async fn unfollow(
1276 &self,
1277 room_id: RoomId,
1278 project_id: ProjectId,
1279 leader_connection: ConnectionId,
1280 follower_connection: ConnectionId,
1281 ) -> Result<TransactionGuard<proto::Room>> {
1282 self.room_transaction(room_id, |tx| async move {
1283 follower::Entity::delete_many()
1284 .filter(
1285 Condition::all()
1286 .add(follower::Column::RoomId.eq(room_id))
1287 .add(follower::Column::ProjectId.eq(project_id))
1288 .add(
1289 follower::Column::LeaderConnectionServerId
1290 .eq(leader_connection.owner_id),
1291 )
1292 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1293 .add(
1294 follower::Column::FollowerConnectionServerId
1295 .eq(follower_connection.owner_id),
1296 )
1297 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1298 )
1299 .exec(&*tx)
1300 .await?;
1301
1302 let room = self.get_room(room_id, &tx).await?;
1303 Ok(room)
1304 })
1305 .await
1306 }
1307}