1use anyhow::Context as _;
2use util::ResultExt;
3
4use super::*;
5
6impl Database {
7 /// Returns the count of all projects, excluding ones marked as admin.
8 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
9 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
10 enum QueryAs {
11 Count,
12 }
13
14 self.transaction(|tx| async move {
15 Ok(project::Entity::find()
16 .select_only()
17 .column_as(project::Column::Id.count(), QueryAs::Count)
18 .inner_join(user::Entity)
19 .filter(user::Column::Admin.eq(false))
20 .into_values::<_, QueryAs>()
21 .one(&*tx)
22 .await?
23 .unwrap_or(0i64) as usize)
24 })
25 .await
26 }
27
28 /// Shares a project with the given room.
29 pub async fn share_project(
30 &self,
31 room_id: RoomId,
32 connection: ConnectionId,
33 worktrees: &[proto::WorktreeMetadata],
34 is_ssh_project: bool,
35 dev_server_project_id: Option<DevServerProjectId>,
36 ) -> Result<TransactionGuard<(ProjectId, proto::Room)>> {
37 self.room_transaction(room_id, |tx| async move {
38 let participant = room_participant::Entity::find()
39 .filter(
40 Condition::all()
41 .add(
42 room_participant::Column::AnsweringConnectionId
43 .eq(connection.id as i32),
44 )
45 .add(
46 room_participant::Column::AnsweringConnectionServerId
47 .eq(connection.owner_id as i32),
48 ),
49 )
50 .one(&*tx)
51 .await?
52 .ok_or_else(|| anyhow!("could not find participant"))?;
53 if participant.room_id != room_id {
54 return Err(anyhow!("shared project on unexpected room"))?;
55 }
56 if !participant
57 .role
58 .unwrap_or(ChannelRole::Member)
59 .can_edit_projects()
60 {
61 return Err(anyhow!("guests cannot share projects"))?;
62 }
63
64 if let Some(dev_server_project_id) = dev_server_project_id {
65 let project = project::Entity::find()
66 .filter(project::Column::DevServerProjectId.eq(Some(dev_server_project_id)))
67 .one(&*tx)
68 .await?
69 .ok_or_else(|| anyhow!("no remote project"))?;
70
71 let (_, dev_server) = dev_server_project::Entity::find_by_id(dev_server_project_id)
72 .find_also_related(dev_server::Entity)
73 .one(&*tx)
74 .await?
75 .ok_or_else(|| anyhow!("no dev_server_project"))?;
76
77 if !dev_server.is_some_and(|dev_server| dev_server.user_id == participant.user_id) {
78 return Err(anyhow!("not your dev server"))?;
79 }
80
81 if project.room_id.is_some() {
82 return Err(anyhow!("project already shared"))?;
83 };
84
85 let project = project::Entity::update(project::ActiveModel {
86 room_id: ActiveValue::Set(Some(room_id)),
87 ..project.into_active_model()
88 })
89 .exec(&*tx)
90 .await?;
91
92 let room = self.get_room(room_id, &tx).await?;
93 return Ok((project.id, room));
94 }
95
96 let project = project::ActiveModel {
97 room_id: ActiveValue::set(Some(participant.room_id)),
98 host_user_id: ActiveValue::set(Some(participant.user_id)),
99 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
100 host_connection_server_id: ActiveValue::set(Some(ServerId(
101 connection.owner_id as i32,
102 ))),
103 id: ActiveValue::NotSet,
104 hosted_project_id: ActiveValue::Set(None),
105 dev_server_project_id: ActiveValue::Set(None),
106 }
107 .insert(&*tx)
108 .await?;
109
110 if !worktrees.is_empty() {
111 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
112 worktree::ActiveModel {
113 id: ActiveValue::set(worktree.id as i64),
114 project_id: ActiveValue::set(project.id),
115 abs_path: ActiveValue::set(worktree.abs_path.clone()),
116 root_name: ActiveValue::set(worktree.root_name.clone()),
117 visible: ActiveValue::set(worktree.visible),
118 scan_id: ActiveValue::set(0),
119 completed_scan_id: ActiveValue::set(0),
120 }
121 }))
122 .exec(&*tx)
123 .await?;
124 }
125
126 let replica_id = if is_ssh_project { 1 } else { 0 };
127
128 project_collaborator::ActiveModel {
129 project_id: ActiveValue::set(project.id),
130 connection_id: ActiveValue::set(connection.id as i32),
131 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
132 user_id: ActiveValue::set(participant.user_id),
133 replica_id: ActiveValue::set(ReplicaId(replica_id)),
134 is_host: ActiveValue::set(true),
135 ..Default::default()
136 }
137 .insert(&*tx)
138 .await?;
139
140 let room = self.get_room(room_id, &tx).await?;
141 Ok((project.id, room))
142 })
143 .await
144 }
145
146 pub async fn delete_project(&self, project_id: ProjectId) -> Result<()> {
147 self.weak_transaction(|tx| async move {
148 project::Entity::delete_by_id(project_id).exec(&*tx).await?;
149 Ok(())
150 })
151 .await
152 }
153
154 /// Unshares the given project.
155 pub async fn unshare_project(
156 &self,
157 project_id: ProjectId,
158 connection: ConnectionId,
159 user_id: Option<UserId>,
160 ) -> Result<TransactionGuard<(bool, Option<proto::Room>, Vec<ConnectionId>)>> {
161 self.project_transaction(project_id, |tx| async move {
162 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
163 let project = project::Entity::find_by_id(project_id)
164 .one(&*tx)
165 .await?
166 .ok_or_else(|| anyhow!("project not found"))?;
167 let room = if let Some(room_id) = project.room_id {
168 Some(self.get_room(room_id, &tx).await?)
169 } else {
170 None
171 };
172 if project.host_connection()? == connection {
173 return Ok((true, room, guest_connection_ids));
174 }
175 if let Some(dev_server_project_id) = project.dev_server_project_id {
176 if let Some(user_id) = user_id {
177 if user_id
178 != self
179 .owner_for_dev_server_project(dev_server_project_id, &tx)
180 .await?
181 {
182 Err(anyhow!("cannot unshare a project hosted by another user"))?
183 }
184 project::Entity::update(project::ActiveModel {
185 room_id: ActiveValue::Set(None),
186 ..project.into_active_model()
187 })
188 .exec(&*tx)
189 .await?;
190 return Ok((false, room, guest_connection_ids));
191 }
192 }
193
194 Err(anyhow!("cannot unshare a project hosted by another user"))?
195 })
196 .await
197 }
198
199 /// Updates the worktrees associated with the given project.
200 pub async fn update_project(
201 &self,
202 project_id: ProjectId,
203 connection: ConnectionId,
204 worktrees: &[proto::WorktreeMetadata],
205 ) -> Result<TransactionGuard<(Option<proto::Room>, Vec<ConnectionId>)>> {
206 self.project_transaction(project_id, |tx| async move {
207 let project = project::Entity::find_by_id(project_id)
208 .filter(
209 Condition::all()
210 .add(project::Column::HostConnectionId.eq(connection.id as i32))
211 .add(
212 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
213 ),
214 )
215 .one(&*tx)
216 .await?
217 .ok_or_else(|| anyhow!("no such project"))?;
218
219 self.update_project_worktrees(project.id, worktrees, &tx)
220 .await?;
221
222 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
223
224 let room = if let Some(room_id) = project.room_id {
225 Some(self.get_room(room_id, &tx).await?)
226 } else {
227 None
228 };
229
230 Ok((room, guest_connection_ids))
231 })
232 .await
233 }
234
235 pub(in crate::db) async fn update_project_worktrees(
236 &self,
237 project_id: ProjectId,
238 worktrees: &[proto::WorktreeMetadata],
239 tx: &DatabaseTransaction,
240 ) -> Result<()> {
241 if !worktrees.is_empty() {
242 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
243 id: ActiveValue::set(worktree.id as i64),
244 project_id: ActiveValue::set(project_id),
245 abs_path: ActiveValue::set(worktree.abs_path.clone()),
246 root_name: ActiveValue::set(worktree.root_name.clone()),
247 visible: ActiveValue::set(worktree.visible),
248 scan_id: ActiveValue::set(0),
249 completed_scan_id: ActiveValue::set(0),
250 }))
251 .on_conflict(
252 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
253 .update_column(worktree::Column::RootName)
254 .to_owned(),
255 )
256 .exec(tx)
257 .await?;
258 }
259
260 worktree::Entity::delete_many()
261 .filter(worktree::Column::ProjectId.eq(project_id).and(
262 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
263 ))
264 .exec(tx)
265 .await?;
266
267 Ok(())
268 }
269
270 pub async fn update_worktree(
271 &self,
272 update: &proto::UpdateWorktree,
273 connection: ConnectionId,
274 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
275 if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
276 || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
277 {
278 return Err(anyhow!(
279 "invalid worktree update. removed entries: {}, updated entries: {}",
280 update.removed_entries.len(),
281 update.updated_entries.len()
282 ))?;
283 }
284
285 let project_id = ProjectId::from_proto(update.project_id);
286 let worktree_id = update.worktree_id as i64;
287 self.project_transaction(project_id, |tx| async move {
288 // Ensure the update comes from the host.
289 let _project = project::Entity::find_by_id(project_id)
290 .filter(
291 Condition::all()
292 .add(project::Column::HostConnectionId.eq(connection.id as i32))
293 .add(
294 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
295 ),
296 )
297 .one(&*tx)
298 .await?
299 .ok_or_else(|| anyhow!("no such project: {project_id}"))?;
300
301 // Update metadata.
302 worktree::Entity::update(worktree::ActiveModel {
303 id: ActiveValue::set(worktree_id),
304 project_id: ActiveValue::set(project_id),
305 root_name: ActiveValue::set(update.root_name.clone()),
306 scan_id: ActiveValue::set(update.scan_id as i64),
307 completed_scan_id: if update.is_last_update {
308 ActiveValue::set(update.scan_id as i64)
309 } else {
310 ActiveValue::default()
311 },
312 abs_path: ActiveValue::set(update.abs_path.clone()),
313 ..Default::default()
314 })
315 .exec(&*tx)
316 .await?;
317
318 if !update.updated_entries.is_empty() {
319 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
320 let mtime = entry.mtime.clone().unwrap_or_default();
321 worktree_entry::ActiveModel {
322 project_id: ActiveValue::set(project_id),
323 worktree_id: ActiveValue::set(worktree_id),
324 id: ActiveValue::set(entry.id as i64),
325 is_dir: ActiveValue::set(entry.is_dir),
326 path: ActiveValue::set(entry.path.clone()),
327 inode: ActiveValue::set(entry.inode as i64),
328 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
329 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
330 canonical_path: ActiveValue::set(entry.canonical_path.clone()),
331 is_ignored: ActiveValue::set(entry.is_ignored),
332 is_external: ActiveValue::set(entry.is_external),
333 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
334 is_deleted: ActiveValue::set(false),
335 scan_id: ActiveValue::set(update.scan_id as i64),
336 is_fifo: ActiveValue::set(entry.is_fifo),
337 }
338 }))
339 .on_conflict(
340 OnConflict::columns([
341 worktree_entry::Column::ProjectId,
342 worktree_entry::Column::WorktreeId,
343 worktree_entry::Column::Id,
344 ])
345 .update_columns([
346 worktree_entry::Column::IsDir,
347 worktree_entry::Column::Path,
348 worktree_entry::Column::Inode,
349 worktree_entry::Column::MtimeSeconds,
350 worktree_entry::Column::MtimeNanos,
351 worktree_entry::Column::CanonicalPath,
352 worktree_entry::Column::IsIgnored,
353 worktree_entry::Column::GitStatus,
354 worktree_entry::Column::ScanId,
355 ])
356 .to_owned(),
357 )
358 .exec(&*tx)
359 .await?;
360 }
361
362 if !update.removed_entries.is_empty() {
363 worktree_entry::Entity::update_many()
364 .filter(
365 worktree_entry::Column::ProjectId
366 .eq(project_id)
367 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
368 .and(
369 worktree_entry::Column::Id
370 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
371 ),
372 )
373 .set(worktree_entry::ActiveModel {
374 is_deleted: ActiveValue::Set(true),
375 scan_id: ActiveValue::Set(update.scan_id as i64),
376 ..Default::default()
377 })
378 .exec(&*tx)
379 .await?;
380 }
381
382 if !update.updated_repositories.is_empty() {
383 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
384 |repository| worktree_repository::ActiveModel {
385 project_id: ActiveValue::set(project_id),
386 worktree_id: ActiveValue::set(worktree_id),
387 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
388 scan_id: ActiveValue::set(update.scan_id as i64),
389 branch: ActiveValue::set(repository.branch.clone()),
390 is_deleted: ActiveValue::set(false),
391 },
392 ))
393 .on_conflict(
394 OnConflict::columns([
395 worktree_repository::Column::ProjectId,
396 worktree_repository::Column::WorktreeId,
397 worktree_repository::Column::WorkDirectoryId,
398 ])
399 .update_columns([
400 worktree_repository::Column::ScanId,
401 worktree_repository::Column::Branch,
402 ])
403 .to_owned(),
404 )
405 .exec(&*tx)
406 .await?;
407 }
408
409 if !update.removed_repositories.is_empty() {
410 worktree_repository::Entity::update_many()
411 .filter(
412 worktree_repository::Column::ProjectId
413 .eq(project_id)
414 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
415 .and(
416 worktree_repository::Column::WorkDirectoryId
417 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
418 ),
419 )
420 .set(worktree_repository::ActiveModel {
421 is_deleted: ActiveValue::Set(true),
422 scan_id: ActiveValue::Set(update.scan_id as i64),
423 ..Default::default()
424 })
425 .exec(&*tx)
426 .await?;
427 }
428
429 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
430 Ok(connection_ids)
431 })
432 .await
433 }
434
435 /// Updates the diagnostic summary for the given connection.
436 pub async fn update_diagnostic_summary(
437 &self,
438 update: &proto::UpdateDiagnosticSummary,
439 connection: ConnectionId,
440 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
441 let project_id = ProjectId::from_proto(update.project_id);
442 let worktree_id = update.worktree_id as i64;
443 self.project_transaction(project_id, |tx| async move {
444 let summary = update
445 .summary
446 .as_ref()
447 .ok_or_else(|| anyhow!("invalid summary"))?;
448
449 // Ensure the update comes from the host.
450 let project = project::Entity::find_by_id(project_id)
451 .one(&*tx)
452 .await?
453 .ok_or_else(|| anyhow!("no such project"))?;
454 if project.host_connection()? != connection {
455 return Err(anyhow!("can't update a project hosted by someone else"))?;
456 }
457
458 // Update summary.
459 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
460 project_id: ActiveValue::set(project_id),
461 worktree_id: ActiveValue::set(worktree_id),
462 path: ActiveValue::set(summary.path.clone()),
463 language_server_id: ActiveValue::set(summary.language_server_id as i64),
464 error_count: ActiveValue::set(summary.error_count as i32),
465 warning_count: ActiveValue::set(summary.warning_count as i32),
466 })
467 .on_conflict(
468 OnConflict::columns([
469 worktree_diagnostic_summary::Column::ProjectId,
470 worktree_diagnostic_summary::Column::WorktreeId,
471 worktree_diagnostic_summary::Column::Path,
472 ])
473 .update_columns([
474 worktree_diagnostic_summary::Column::LanguageServerId,
475 worktree_diagnostic_summary::Column::ErrorCount,
476 worktree_diagnostic_summary::Column::WarningCount,
477 ])
478 .to_owned(),
479 )
480 .exec(&*tx)
481 .await?;
482
483 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
484 Ok(connection_ids)
485 })
486 .await
487 }
488
489 /// Starts the language server for the given connection.
490 pub async fn start_language_server(
491 &self,
492 update: &proto::StartLanguageServer,
493 connection: ConnectionId,
494 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
495 let project_id = ProjectId::from_proto(update.project_id);
496 self.project_transaction(project_id, |tx| async move {
497 let server = update
498 .server
499 .as_ref()
500 .ok_or_else(|| anyhow!("invalid language server"))?;
501
502 // Ensure the update comes from the host.
503 let project = project::Entity::find_by_id(project_id)
504 .one(&*tx)
505 .await?
506 .ok_or_else(|| anyhow!("no such project"))?;
507 if project.host_connection()? != connection {
508 return Err(anyhow!("can't update a project hosted by someone else"))?;
509 }
510
511 // Add the newly-started language server.
512 language_server::Entity::insert(language_server::ActiveModel {
513 project_id: ActiveValue::set(project_id),
514 id: ActiveValue::set(server.id as i64),
515 name: ActiveValue::set(server.name.clone()),
516 })
517 .on_conflict(
518 OnConflict::columns([
519 language_server::Column::ProjectId,
520 language_server::Column::Id,
521 ])
522 .update_column(language_server::Column::Name)
523 .to_owned(),
524 )
525 .exec(&*tx)
526 .await?;
527
528 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
529 Ok(connection_ids)
530 })
531 .await
532 }
533
534 /// Updates the worktree settings for the given connection.
535 pub async fn update_worktree_settings(
536 &self,
537 update: &proto::UpdateWorktreeSettings,
538 connection: ConnectionId,
539 ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
540 let project_id = ProjectId::from_proto(update.project_id);
541 let kind = match update.kind {
542 Some(kind) => proto::LocalSettingsKind::from_i32(kind)
543 .with_context(|| format!("unknown worktree settings kind: {kind}"))?,
544 None => proto::LocalSettingsKind::Settings,
545 };
546 let kind = LocalSettingsKind::from_proto(kind);
547 self.project_transaction(project_id, |tx| async move {
548 // Ensure the update comes from the host.
549 let project = project::Entity::find_by_id(project_id)
550 .one(&*tx)
551 .await?
552 .ok_or_else(|| anyhow!("no such project"))?;
553 if project.host_connection()? != connection {
554 return Err(anyhow!("can't update a project hosted by someone else"))?;
555 }
556
557 if let Some(content) = &update.content {
558 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
559 project_id: ActiveValue::Set(project_id),
560 worktree_id: ActiveValue::Set(update.worktree_id as i64),
561 path: ActiveValue::Set(update.path.clone()),
562 content: ActiveValue::Set(content.clone()),
563 kind: ActiveValue::Set(kind),
564 })
565 .on_conflict(
566 OnConflict::columns([
567 worktree_settings_file::Column::ProjectId,
568 worktree_settings_file::Column::WorktreeId,
569 worktree_settings_file::Column::Path,
570 ])
571 .update_column(worktree_settings_file::Column::Content)
572 .to_owned(),
573 )
574 .exec(&*tx)
575 .await?;
576 } else {
577 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
578 project_id: ActiveValue::Set(project_id),
579 worktree_id: ActiveValue::Set(update.worktree_id as i64),
580 path: ActiveValue::Set(update.path.clone()),
581 ..Default::default()
582 })
583 .exec(&*tx)
584 .await?;
585 }
586
587 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
588 Ok(connection_ids)
589 })
590 .await
591 }
592
593 /// Adds the given connection to the specified hosted project
594 pub async fn join_hosted_project(
595 &self,
596 id: ProjectId,
597 user_id: UserId,
598 connection: ConnectionId,
599 ) -> Result<(Project, ReplicaId)> {
600 self.transaction(|tx| async move {
601 let (project, hosted_project) = project::Entity::find_by_id(id)
602 .find_also_related(hosted_project::Entity)
603 .one(&*tx)
604 .await?
605 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
606
607 let Some(hosted_project) = hosted_project else {
608 return Err(anyhow!("project is not hosted"))?;
609 };
610
611 let channel = channel::Entity::find_by_id(hosted_project.channel_id)
612 .one(&*tx)
613 .await?
614 .ok_or_else(|| anyhow!("no such channel"))?;
615
616 let role = self
617 .check_user_is_channel_participant(&channel, user_id, &tx)
618 .await?;
619
620 self.join_project_internal(project, user_id, connection, role, &tx)
621 .await
622 })
623 .await
624 }
625
626 pub async fn get_project(&self, id: ProjectId) -> Result<project::Model> {
627 self.transaction(|tx| async move {
628 Ok(project::Entity::find_by_id(id)
629 .one(&*tx)
630 .await?
631 .ok_or_else(|| anyhow!("no such project"))?)
632 })
633 .await
634 }
635
636 pub async fn find_dev_server_project(&self, id: DevServerProjectId) -> Result<project::Model> {
637 self.transaction(|tx| async move {
638 Ok(project::Entity::find()
639 .filter(project::Column::DevServerProjectId.eq(id))
640 .one(&*tx)
641 .await?
642 .ok_or_else(|| anyhow!("no such project"))?)
643 })
644 .await
645 }
646
647 /// Adds the given connection to the specified project
648 /// in the current room.
649 pub async fn join_project(
650 &self,
651 project_id: ProjectId,
652 connection: ConnectionId,
653 user_id: UserId,
654 ) -> Result<TransactionGuard<(Project, ReplicaId)>> {
655 self.project_transaction(project_id, |tx| async move {
656 let (project, role) = self
657 .access_project(
658 project_id,
659 connection,
660 PrincipalId::UserId(user_id),
661 Capability::ReadOnly,
662 &tx,
663 )
664 .await?;
665 self.join_project_internal(project, user_id, connection, role, &tx)
666 .await
667 })
668 .await
669 }
670
671 async fn join_project_internal(
672 &self,
673 project: project::Model,
674 user_id: UserId,
675 connection: ConnectionId,
676 role: ChannelRole,
677 tx: &DatabaseTransaction,
678 ) -> Result<(Project, ReplicaId)> {
679 let mut collaborators = project
680 .find_related(project_collaborator::Entity)
681 .all(tx)
682 .await?;
683 let replica_ids = collaborators
684 .iter()
685 .map(|c| c.replica_id)
686 .collect::<HashSet<_>>();
687 let mut replica_id = ReplicaId(1);
688 while replica_ids.contains(&replica_id) {
689 replica_id.0 += 1;
690 }
691 let new_collaborator = project_collaborator::ActiveModel {
692 project_id: ActiveValue::set(project.id),
693 connection_id: ActiveValue::set(connection.id as i32),
694 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
695 user_id: ActiveValue::set(user_id),
696 replica_id: ActiveValue::set(replica_id),
697 is_host: ActiveValue::set(false),
698 ..Default::default()
699 }
700 .insert(tx)
701 .await?;
702 collaborators.push(new_collaborator);
703
704 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
705 let mut worktrees = db_worktrees
706 .into_iter()
707 .map(|db_worktree| {
708 (
709 db_worktree.id as u64,
710 Worktree {
711 id: db_worktree.id as u64,
712 abs_path: db_worktree.abs_path,
713 root_name: db_worktree.root_name,
714 visible: db_worktree.visible,
715 entries: Default::default(),
716 repository_entries: Default::default(),
717 diagnostic_summaries: Default::default(),
718 settings_files: Default::default(),
719 scan_id: db_worktree.scan_id as u64,
720 completed_scan_id: db_worktree.completed_scan_id as u64,
721 },
722 )
723 })
724 .collect::<BTreeMap<_, _>>();
725
726 // Populate worktree entries.
727 {
728 let mut db_entries = worktree_entry::Entity::find()
729 .filter(
730 Condition::all()
731 .add(worktree_entry::Column::ProjectId.eq(project.id))
732 .add(worktree_entry::Column::IsDeleted.eq(false)),
733 )
734 .stream(tx)
735 .await?;
736 while let Some(db_entry) = db_entries.next().await {
737 let db_entry = db_entry?;
738 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
739 worktree.entries.push(proto::Entry {
740 id: db_entry.id as u64,
741 is_dir: db_entry.is_dir,
742 path: db_entry.path,
743 inode: db_entry.inode as u64,
744 mtime: Some(proto::Timestamp {
745 seconds: db_entry.mtime_seconds as u64,
746 nanos: db_entry.mtime_nanos as u32,
747 }),
748 canonical_path: db_entry.canonical_path,
749 is_ignored: db_entry.is_ignored,
750 is_external: db_entry.is_external,
751 git_status: db_entry.git_status.map(|status| status as i32),
752 // This is only used in the summarization backlog, so if it's None,
753 // that just means we won't be able to detect when to resummarize
754 // based on total number of backlogged bytes - instead, we'd go
755 // on number of files only. That shouldn't be a huge deal in practice.
756 size: None,
757 is_fifo: db_entry.is_fifo,
758 });
759 }
760 }
761 }
762
763 // Populate repository entries.
764 {
765 let mut db_repository_entries = worktree_repository::Entity::find()
766 .filter(
767 Condition::all()
768 .add(worktree_repository::Column::ProjectId.eq(project.id))
769 .add(worktree_repository::Column::IsDeleted.eq(false)),
770 )
771 .stream(tx)
772 .await?;
773 while let Some(db_repository_entry) = db_repository_entries.next().await {
774 let db_repository_entry = db_repository_entry?;
775 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
776 {
777 worktree.repository_entries.insert(
778 db_repository_entry.work_directory_id as u64,
779 proto::RepositoryEntry {
780 work_directory_id: db_repository_entry.work_directory_id as u64,
781 branch: db_repository_entry.branch,
782 },
783 );
784 }
785 }
786 }
787
788 // Populate worktree diagnostic summaries.
789 {
790 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
791 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
792 .stream(tx)
793 .await?;
794 while let Some(db_summary) = db_summaries.next().await {
795 let db_summary = db_summary?;
796 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
797 worktree
798 .diagnostic_summaries
799 .push(proto::DiagnosticSummary {
800 path: db_summary.path,
801 language_server_id: db_summary.language_server_id as u64,
802 error_count: db_summary.error_count as u32,
803 warning_count: db_summary.warning_count as u32,
804 });
805 }
806 }
807 }
808
809 // Populate worktree settings files
810 {
811 let mut db_settings_files = worktree_settings_file::Entity::find()
812 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
813 .stream(tx)
814 .await?;
815 while let Some(db_settings_file) = db_settings_files.next().await {
816 let db_settings_file = db_settings_file?;
817 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
818 worktree.settings_files.push(WorktreeSettingsFile {
819 path: db_settings_file.path,
820 content: db_settings_file.content,
821 kind: db_settings_file.kind,
822 });
823 }
824 }
825 }
826
827 // Populate language servers.
828 let language_servers = project
829 .find_related(language_server::Entity)
830 .all(tx)
831 .await?;
832
833 let project = Project {
834 id: project.id,
835 role,
836 collaborators: collaborators
837 .into_iter()
838 .map(|collaborator| ProjectCollaborator {
839 connection_id: collaborator.connection(),
840 user_id: collaborator.user_id,
841 replica_id: collaborator.replica_id,
842 is_host: collaborator.is_host,
843 })
844 .collect(),
845 worktrees,
846 language_servers: language_servers
847 .into_iter()
848 .map(|language_server| proto::LanguageServer {
849 id: language_server.id as u64,
850 name: language_server.name,
851 worktree_id: None,
852 })
853 .collect(),
854 dev_server_project_id: project.dev_server_project_id,
855 };
856 Ok((project, replica_id as ReplicaId))
857 }
858
859 pub async fn leave_hosted_project(
860 &self,
861 project_id: ProjectId,
862 connection: ConnectionId,
863 ) -> Result<LeftProject> {
864 self.transaction(|tx| async move {
865 let result = project_collaborator::Entity::delete_many()
866 .filter(
867 Condition::all()
868 .add(project_collaborator::Column::ProjectId.eq(project_id))
869 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
870 .add(
871 project_collaborator::Column::ConnectionServerId
872 .eq(connection.owner_id as i32),
873 ),
874 )
875 .exec(&*tx)
876 .await?;
877 if result.rows_affected == 0 {
878 return Err(anyhow!("not in the project"))?;
879 }
880
881 let project = project::Entity::find_by_id(project_id)
882 .one(&*tx)
883 .await?
884 .ok_or_else(|| anyhow!("no such project"))?;
885 let collaborators = project
886 .find_related(project_collaborator::Entity)
887 .all(&*tx)
888 .await?;
889 let connection_ids = collaborators
890 .into_iter()
891 .map(|collaborator| collaborator.connection())
892 .collect();
893 Ok(LeftProject {
894 id: project.id,
895 connection_ids,
896 should_unshare: false,
897 })
898 })
899 .await
900 }
901
902 /// Removes the given connection from the specified project.
903 pub async fn leave_project(
904 &self,
905 project_id: ProjectId,
906 connection: ConnectionId,
907 ) -> Result<TransactionGuard<(Option<proto::Room>, LeftProject)>> {
908 self.project_transaction(project_id, |tx| async move {
909 let result = project_collaborator::Entity::delete_many()
910 .filter(
911 Condition::all()
912 .add(project_collaborator::Column::ProjectId.eq(project_id))
913 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
914 .add(
915 project_collaborator::Column::ConnectionServerId
916 .eq(connection.owner_id as i32),
917 ),
918 )
919 .exec(&*tx)
920 .await?;
921 if result.rows_affected == 0 {
922 Err(anyhow!("not a collaborator on this project"))?;
923 }
924
925 let project = project::Entity::find_by_id(project_id)
926 .one(&*tx)
927 .await?
928 .ok_or_else(|| anyhow!("no such project"))?;
929 let collaborators = project
930 .find_related(project_collaborator::Entity)
931 .all(&*tx)
932 .await?;
933 let connection_ids: Vec<ConnectionId> = collaborators
934 .into_iter()
935 .map(|collaborator| collaborator.connection())
936 .collect();
937
938 follower::Entity::delete_many()
939 .filter(
940 Condition::any()
941 .add(
942 Condition::all()
943 .add(follower::Column::ProjectId.eq(Some(project_id)))
944 .add(
945 follower::Column::LeaderConnectionServerId
946 .eq(connection.owner_id),
947 )
948 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
949 )
950 .add(
951 Condition::all()
952 .add(follower::Column::ProjectId.eq(Some(project_id)))
953 .add(
954 follower::Column::FollowerConnectionServerId
955 .eq(connection.owner_id),
956 )
957 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
958 ),
959 )
960 .exec(&*tx)
961 .await?;
962
963 let room = if let Some(room_id) = project.room_id {
964 Some(self.get_room(room_id, &tx).await?)
965 } else {
966 None
967 };
968
969 let left_project = LeftProject {
970 id: project_id,
971 should_unshare: connection == project.host_connection()?,
972 connection_ids,
973 };
974 Ok((room, left_project))
975 })
976 .await
977 }
978
979 pub async fn check_user_is_project_host(
980 &self,
981 project_id: ProjectId,
982 connection_id: ConnectionId,
983 ) -> Result<()> {
984 self.project_transaction(project_id, |tx| async move {
985 project::Entity::find()
986 .filter(
987 Condition::all()
988 .add(project::Column::Id.eq(project_id))
989 .add(project::Column::HostConnectionId.eq(Some(connection_id.id as i32)))
990 .add(
991 project::Column::HostConnectionServerId
992 .eq(Some(connection_id.owner_id as i32)),
993 ),
994 )
995 .one(&*tx)
996 .await?
997 .ok_or_else(|| anyhow!("failed to read project host"))?;
998
999 Ok(())
1000 })
1001 .await
1002 .map(|guard| guard.into_inner())
1003 }
1004
1005 /// Returns the current project if the given user is authorized to access it with the specified capability.
1006 pub async fn access_project(
1007 &self,
1008 project_id: ProjectId,
1009 connection_id: ConnectionId,
1010 principal_id: PrincipalId,
1011 capability: Capability,
1012 tx: &DatabaseTransaction,
1013 ) -> Result<(project::Model, ChannelRole)> {
1014 let (mut project, dev_server_project) = project::Entity::find_by_id(project_id)
1015 .find_also_related(dev_server_project::Entity)
1016 .one(tx)
1017 .await?
1018 .ok_or_else(|| anyhow!("no such project"))?;
1019
1020 let user_id = match principal_id {
1021 PrincipalId::DevServerId(_) => {
1022 if project
1023 .host_connection()
1024 .is_ok_and(|connection| connection == connection_id)
1025 {
1026 return Ok((project, ChannelRole::Admin));
1027 }
1028 return Err(anyhow!("not the project host"))?;
1029 }
1030 PrincipalId::UserId(user_id) => user_id,
1031 };
1032
1033 let role_from_room = if let Some(room_id) = project.room_id {
1034 room_participant::Entity::find()
1035 .filter(room_participant::Column::RoomId.eq(room_id))
1036 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
1037 .one(tx)
1038 .await?
1039 .and_then(|participant| participant.role)
1040 } else {
1041 None
1042 };
1043 let role_from_dev_server = if let Some(dev_server_project) = dev_server_project {
1044 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1045 .one(tx)
1046 .await?
1047 .ok_or_else(|| anyhow!("no such channel"))?;
1048 if user_id == dev_server.user_id {
1049 // If the user left the room "uncleanly" they may rejoin the
1050 // remote project before leave_room runs. IN that case kick
1051 // the project out of the room pre-emptively.
1052 if role_from_room.is_none() {
1053 project = project::Entity::update(project::ActiveModel {
1054 room_id: ActiveValue::Set(None),
1055 ..project.into_active_model()
1056 })
1057 .exec(tx)
1058 .await?;
1059 }
1060 Some(ChannelRole::Admin)
1061 } else {
1062 None
1063 }
1064 } else {
1065 None
1066 };
1067
1068 let role = role_from_dev_server
1069 .or(role_from_room)
1070 .unwrap_or(ChannelRole::Banned);
1071
1072 match capability {
1073 Capability::ReadWrite => {
1074 if !role.can_edit_projects() {
1075 return Err(anyhow!("not authorized to edit projects"))?;
1076 }
1077 }
1078 Capability::ReadOnly => {
1079 if !role.can_read_projects() {
1080 return Err(anyhow!("not authorized to read projects"))?;
1081 }
1082 }
1083 }
1084
1085 Ok((project, role))
1086 }
1087
1088 /// Returns the host connection for a read-only request to join a shared project.
1089 pub async fn host_for_read_only_project_request(
1090 &self,
1091 project_id: ProjectId,
1092 connection_id: ConnectionId,
1093 user_id: UserId,
1094 ) -> Result<ConnectionId> {
1095 self.project_transaction(project_id, |tx| async move {
1096 let (project, _) = self
1097 .access_project(
1098 project_id,
1099 connection_id,
1100 PrincipalId::UserId(user_id),
1101 Capability::ReadOnly,
1102 &tx,
1103 )
1104 .await?;
1105 project.host_connection()
1106 })
1107 .await
1108 .map(|guard| guard.into_inner())
1109 }
1110
1111 /// Returns the host connection for a request to join a shared project.
1112 pub async fn host_for_mutating_project_request(
1113 &self,
1114 project_id: ProjectId,
1115 connection_id: ConnectionId,
1116 user_id: UserId,
1117 ) -> Result<ConnectionId> {
1118 self.project_transaction(project_id, |tx| async move {
1119 let (project, _) = self
1120 .access_project(
1121 project_id,
1122 connection_id,
1123 PrincipalId::UserId(user_id),
1124 Capability::ReadWrite,
1125 &tx,
1126 )
1127 .await?;
1128 project.host_connection()
1129 })
1130 .await
1131 .map(|guard| guard.into_inner())
1132 }
1133
1134 /// Returns the host connection for a request to join a shared project.
1135 pub async fn host_for_owner_project_request(
1136 &self,
1137 project_id: ProjectId,
1138 _connection_id: ConnectionId,
1139 user_id: UserId,
1140 ) -> Result<ConnectionId> {
1141 self.project_transaction(project_id, |tx| async move {
1142 let (project, dev_server_project) = project::Entity::find_by_id(project_id)
1143 .find_also_related(dev_server_project::Entity)
1144 .one(&*tx)
1145 .await?
1146 .ok_or_else(|| anyhow!("no such project"))?;
1147
1148 let Some(dev_server_project) = dev_server_project else {
1149 return Err(anyhow!("not a dev server project"))?;
1150 };
1151 let dev_server = dev_server::Entity::find_by_id(dev_server_project.dev_server_id)
1152 .one(&*tx)
1153 .await?
1154 .ok_or_else(|| anyhow!("no such dev server"))?;
1155 if dev_server.user_id != user_id {
1156 return Err(anyhow!("not your project"))?;
1157 }
1158 project.host_connection()
1159 })
1160 .await
1161 .map(|guard| guard.into_inner())
1162 }
1163
1164 pub async fn connections_for_buffer_update(
1165 &self,
1166 project_id: ProjectId,
1167 principal_id: PrincipalId,
1168 connection_id: ConnectionId,
1169 capability: Capability,
1170 ) -> Result<TransactionGuard<(ConnectionId, Vec<ConnectionId>)>> {
1171 self.project_transaction(project_id, |tx| async move {
1172 // Authorize
1173 let (project, _) = self
1174 .access_project(project_id, connection_id, principal_id, capability, &tx)
1175 .await?;
1176
1177 let host_connection_id = project.host_connection()?;
1178
1179 let collaborators = project_collaborator::Entity::find()
1180 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1181 .all(&*tx)
1182 .await?;
1183
1184 let guest_connection_ids = collaborators
1185 .into_iter()
1186 .filter_map(|collaborator| {
1187 if collaborator.is_host {
1188 None
1189 } else {
1190 Some(collaborator.connection())
1191 }
1192 })
1193 .collect();
1194
1195 Ok((host_connection_id, guest_connection_ids))
1196 })
1197 .await
1198 }
1199
1200 /// Returns the connection IDs in the given project.
1201 ///
1202 /// The provided `connection_id` must also be a collaborator in the project,
1203 /// otherwise an error will be returned.
1204 pub async fn project_connection_ids(
1205 &self,
1206 project_id: ProjectId,
1207 connection_id: ConnectionId,
1208 exclude_dev_server: bool,
1209 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1210 self.project_transaction(project_id, |tx| async move {
1211 let project = project::Entity::find_by_id(project_id)
1212 .one(&*tx)
1213 .await?
1214 .ok_or_else(|| anyhow!("no such project"))?;
1215
1216 let mut collaborators = project_collaborator::Entity::find()
1217 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1218 .stream(&*tx)
1219 .await?;
1220
1221 let mut connection_ids = HashSet::default();
1222 if let Some(host_connection) = project.host_connection().log_err() {
1223 if !exclude_dev_server {
1224 connection_ids.insert(host_connection);
1225 }
1226 }
1227
1228 while let Some(collaborator) = collaborators.next().await {
1229 let collaborator = collaborator?;
1230 connection_ids.insert(collaborator.connection());
1231 }
1232
1233 if connection_ids.contains(&connection_id)
1234 || Some(connection_id) == project.host_connection().ok()
1235 {
1236 Ok(connection_ids)
1237 } else {
1238 Err(anyhow!(
1239 "can only send project updates to a project you're in"
1240 ))?
1241 }
1242 })
1243 .await
1244 }
1245
1246 async fn project_guest_connection_ids(
1247 &self,
1248 project_id: ProjectId,
1249 tx: &DatabaseTransaction,
1250 ) -> Result<Vec<ConnectionId>> {
1251 let mut collaborators = project_collaborator::Entity::find()
1252 .filter(
1253 project_collaborator::Column::ProjectId
1254 .eq(project_id)
1255 .and(project_collaborator::Column::IsHost.eq(false)),
1256 )
1257 .stream(tx)
1258 .await?;
1259
1260 let mut guest_connection_ids = Vec::new();
1261 while let Some(collaborator) = collaborators.next().await {
1262 let collaborator = collaborator?;
1263 guest_connection_ids.push(collaborator.connection());
1264 }
1265 Ok(guest_connection_ids)
1266 }
1267
1268 /// Returns the [`RoomId`] for the given project.
1269 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<Option<RoomId>> {
1270 self.transaction(|tx| async move {
1271 Ok(project::Entity::find_by_id(project_id)
1272 .one(&*tx)
1273 .await?
1274 .and_then(|project| project.room_id))
1275 })
1276 .await
1277 }
1278
1279 pub async fn check_room_participants(
1280 &self,
1281 room_id: RoomId,
1282 leader_id: ConnectionId,
1283 follower_id: ConnectionId,
1284 ) -> Result<()> {
1285 self.transaction(|tx| async move {
1286 use room_participant::Column;
1287
1288 let count = room_participant::Entity::find()
1289 .filter(
1290 Condition::all().add(Column::RoomId.eq(room_id)).add(
1291 Condition::any()
1292 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1293 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1294 ))
1295 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1296 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1297 )),
1298 ),
1299 )
1300 .count(&*tx)
1301 .await?;
1302
1303 if count < 2 {
1304 Err(anyhow!("not room participants"))?;
1305 }
1306
1307 Ok(())
1308 })
1309 .await
1310 }
1311
1312 /// Adds the given follower connection as a follower of the given leader connection.
1313 pub async fn follow(
1314 &self,
1315 room_id: RoomId,
1316 project_id: ProjectId,
1317 leader_connection: ConnectionId,
1318 follower_connection: ConnectionId,
1319 ) -> Result<TransactionGuard<proto::Room>> {
1320 self.room_transaction(room_id, |tx| async move {
1321 follower::ActiveModel {
1322 room_id: ActiveValue::set(room_id),
1323 project_id: ActiveValue::set(project_id),
1324 leader_connection_server_id: ActiveValue::set(ServerId(
1325 leader_connection.owner_id as i32,
1326 )),
1327 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1328 follower_connection_server_id: ActiveValue::set(ServerId(
1329 follower_connection.owner_id as i32,
1330 )),
1331 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1332 ..Default::default()
1333 }
1334 .insert(&*tx)
1335 .await?;
1336
1337 let room = self.get_room(room_id, &tx).await?;
1338 Ok(room)
1339 })
1340 .await
1341 }
1342
1343 /// Removes the given follower connection as a follower of the given leader connection.
1344 pub async fn unfollow(
1345 &self,
1346 room_id: RoomId,
1347 project_id: ProjectId,
1348 leader_connection: ConnectionId,
1349 follower_connection: ConnectionId,
1350 ) -> Result<TransactionGuard<proto::Room>> {
1351 self.room_transaction(room_id, |tx| async move {
1352 follower::Entity::delete_many()
1353 .filter(
1354 Condition::all()
1355 .add(follower::Column::RoomId.eq(room_id))
1356 .add(follower::Column::ProjectId.eq(project_id))
1357 .add(
1358 follower::Column::LeaderConnectionServerId
1359 .eq(leader_connection.owner_id),
1360 )
1361 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1362 .add(
1363 follower::Column::FollowerConnectionServerId
1364 .eq(follower_connection.owner_id),
1365 )
1366 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1367 )
1368 .exec(&*tx)
1369 .await?;
1370
1371 let room = self.get_room(room_id, &tx).await?;
1372 Ok(room)
1373 })
1374 .await
1375 }
1376}