1use super::*;
2
3impl Database {
4 /// Returns the count of all projects, excluding ones marked as admin.
5 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
6 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
7 enum QueryAs {
8 Count,
9 }
10
11 self.transaction(|tx| async move {
12 Ok(project::Entity::find()
13 .select_only()
14 .column_as(project::Column::Id.count(), QueryAs::Count)
15 .inner_join(user::Entity)
16 .filter(user::Column::Admin.eq(false))
17 .into_values::<_, QueryAs>()
18 .one(&*tx)
19 .await?
20 .unwrap_or(0i64) as usize)
21 })
22 .await
23 }
24
25 /// Shares a project with the given room.
26 pub async fn share_project(
27 &self,
28 room_id: RoomId,
29 connection: ConnectionId,
30 worktrees: &[proto::WorktreeMetadata],
31 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
32 self.room_transaction(room_id, |tx| async move {
33 let participant = room_participant::Entity::find()
34 .filter(
35 Condition::all()
36 .add(
37 room_participant::Column::AnsweringConnectionId
38 .eq(connection.id as i32),
39 )
40 .add(
41 room_participant::Column::AnsweringConnectionServerId
42 .eq(connection.owner_id as i32),
43 ),
44 )
45 .one(&*tx)
46 .await?
47 .ok_or_else(|| anyhow!("could not find participant"))?;
48 if participant.room_id != room_id {
49 return Err(anyhow!("shared project on unexpected room"))?;
50 }
51 if !participant
52 .role
53 .unwrap_or(ChannelRole::Member)
54 .can_edit_projects()
55 {
56 return Err(anyhow!("guests cannot share projects"))?;
57 }
58
59 let project = project::ActiveModel {
60 room_id: ActiveValue::set(Some(participant.room_id)),
61 host_user_id: ActiveValue::set(Some(participant.user_id)),
62 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
63 host_connection_server_id: ActiveValue::set(Some(ServerId(
64 connection.owner_id as i32,
65 ))),
66 id: ActiveValue::NotSet,
67 hosted_project_id: ActiveValue::Set(None),
68 }
69 .insert(&*tx)
70 .await?;
71
72 if !worktrees.is_empty() {
73 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
74 worktree::ActiveModel {
75 id: ActiveValue::set(worktree.id as i64),
76 project_id: ActiveValue::set(project.id),
77 abs_path: ActiveValue::set(worktree.abs_path.clone()),
78 root_name: ActiveValue::set(worktree.root_name.clone()),
79 visible: ActiveValue::set(worktree.visible),
80 scan_id: ActiveValue::set(0),
81 completed_scan_id: ActiveValue::set(0),
82 }
83 }))
84 .exec(&*tx)
85 .await?;
86 }
87
88 project_collaborator::ActiveModel {
89 project_id: ActiveValue::set(project.id),
90 connection_id: ActiveValue::set(connection.id as i32),
91 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
92 user_id: ActiveValue::set(participant.user_id),
93 replica_id: ActiveValue::set(ReplicaId(0)),
94 is_host: ActiveValue::set(true),
95 ..Default::default()
96 }
97 .insert(&*tx)
98 .await?;
99
100 let room = self.get_room(room_id, &tx).await?;
101 Ok((project.id, room))
102 })
103 .await
104 }
105
106 /// Unshares the given project.
107 pub async fn unshare_project(
108 &self,
109 project_id: ProjectId,
110 connection: ConnectionId,
111 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
112 let room_id = self.room_id_for_project(project_id).await?;
113 self.room_transaction(room_id, |tx| async move {
114 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
115
116 let project = project::Entity::find_by_id(project_id)
117 .one(&*tx)
118 .await?
119 .ok_or_else(|| anyhow!("project not found"))?;
120 if project.host_connection()? == connection {
121 project::Entity::delete(project.into_active_model())
122 .exec(&*tx)
123 .await?;
124 let room = self.get_room(room_id, &tx).await?;
125 Ok((room, guest_connection_ids))
126 } else {
127 Err(anyhow!("cannot unshare a project hosted by another user"))?
128 }
129 })
130 .await
131 }
132
133 /// Updates the worktrees associated with the given project.
134 pub async fn update_project(
135 &self,
136 project_id: ProjectId,
137 connection: ConnectionId,
138 worktrees: &[proto::WorktreeMetadata],
139 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
140 let room_id = self.room_id_for_project(project_id).await?;
141 self.room_transaction(room_id, |tx| async move {
142 let project = project::Entity::find_by_id(project_id)
143 .filter(
144 Condition::all()
145 .add(project::Column::HostConnectionId.eq(connection.id as i32))
146 .add(
147 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
148 ),
149 )
150 .one(&*tx)
151 .await?
152 .ok_or_else(|| anyhow!("no such project"))?;
153
154 self.update_project_worktrees(project.id, worktrees, &tx)
155 .await?;
156
157 let room_id = project
158 .room_id
159 .ok_or_else(|| anyhow!("project not in a room"))?;
160
161 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
162 let room = self.get_room(room_id, &tx).await?;
163 Ok((room, guest_connection_ids))
164 })
165 .await
166 }
167
168 pub(in crate::db) async fn update_project_worktrees(
169 &self,
170 project_id: ProjectId,
171 worktrees: &[proto::WorktreeMetadata],
172 tx: &DatabaseTransaction,
173 ) -> Result<()> {
174 if !worktrees.is_empty() {
175 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
176 id: ActiveValue::set(worktree.id as i64),
177 project_id: ActiveValue::set(project_id),
178 abs_path: ActiveValue::set(worktree.abs_path.clone()),
179 root_name: ActiveValue::set(worktree.root_name.clone()),
180 visible: ActiveValue::set(worktree.visible),
181 scan_id: ActiveValue::set(0),
182 completed_scan_id: ActiveValue::set(0),
183 }))
184 .on_conflict(
185 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
186 .update_column(worktree::Column::RootName)
187 .to_owned(),
188 )
189 .exec(tx)
190 .await?;
191 }
192
193 worktree::Entity::delete_many()
194 .filter(worktree::Column::ProjectId.eq(project_id).and(
195 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
196 ))
197 .exec(tx)
198 .await?;
199
200 Ok(())
201 }
202
203 pub async fn update_worktree(
204 &self,
205 update: &proto::UpdateWorktree,
206 connection: ConnectionId,
207 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
208 let project_id = ProjectId::from_proto(update.project_id);
209 let worktree_id = update.worktree_id as i64;
210 let room_id = self.room_id_for_project(project_id).await?;
211 self.room_transaction(room_id, |tx| async move {
212 // Ensure the update comes from the host.
213 let _project = project::Entity::find_by_id(project_id)
214 .filter(
215 Condition::all()
216 .add(project::Column::HostConnectionId.eq(connection.id as i32))
217 .add(
218 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
219 ),
220 )
221 .one(&*tx)
222 .await?
223 .ok_or_else(|| anyhow!("no such project"))?;
224
225 // Update metadata.
226 worktree::Entity::update(worktree::ActiveModel {
227 id: ActiveValue::set(worktree_id),
228 project_id: ActiveValue::set(project_id),
229 root_name: ActiveValue::set(update.root_name.clone()),
230 scan_id: ActiveValue::set(update.scan_id as i64),
231 completed_scan_id: if update.is_last_update {
232 ActiveValue::set(update.scan_id as i64)
233 } else {
234 ActiveValue::default()
235 },
236 abs_path: ActiveValue::set(update.abs_path.clone()),
237 ..Default::default()
238 })
239 .exec(&*tx)
240 .await?;
241
242 if !update.updated_entries.is_empty() {
243 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
244 let mtime = entry.mtime.clone().unwrap_or_default();
245 worktree_entry::ActiveModel {
246 project_id: ActiveValue::set(project_id),
247 worktree_id: ActiveValue::set(worktree_id),
248 id: ActiveValue::set(entry.id as i64),
249 is_dir: ActiveValue::set(entry.is_dir),
250 path: ActiveValue::set(entry.path.clone()),
251 inode: ActiveValue::set(entry.inode as i64),
252 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
253 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
254 is_symlink: ActiveValue::set(entry.is_symlink),
255 is_ignored: ActiveValue::set(entry.is_ignored),
256 is_external: ActiveValue::set(entry.is_external),
257 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
258 is_deleted: ActiveValue::set(false),
259 scan_id: ActiveValue::set(update.scan_id as i64),
260 }
261 }))
262 .on_conflict(
263 OnConflict::columns([
264 worktree_entry::Column::ProjectId,
265 worktree_entry::Column::WorktreeId,
266 worktree_entry::Column::Id,
267 ])
268 .update_columns([
269 worktree_entry::Column::IsDir,
270 worktree_entry::Column::Path,
271 worktree_entry::Column::Inode,
272 worktree_entry::Column::MtimeSeconds,
273 worktree_entry::Column::MtimeNanos,
274 worktree_entry::Column::IsSymlink,
275 worktree_entry::Column::IsIgnored,
276 worktree_entry::Column::GitStatus,
277 worktree_entry::Column::ScanId,
278 ])
279 .to_owned(),
280 )
281 .exec(&*tx)
282 .await?;
283 }
284
285 if !update.removed_entries.is_empty() {
286 worktree_entry::Entity::update_many()
287 .filter(
288 worktree_entry::Column::ProjectId
289 .eq(project_id)
290 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
291 .and(
292 worktree_entry::Column::Id
293 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
294 ),
295 )
296 .set(worktree_entry::ActiveModel {
297 is_deleted: ActiveValue::Set(true),
298 scan_id: ActiveValue::Set(update.scan_id as i64),
299 ..Default::default()
300 })
301 .exec(&*tx)
302 .await?;
303 }
304
305 if !update.updated_repositories.is_empty() {
306 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
307 |repository| worktree_repository::ActiveModel {
308 project_id: ActiveValue::set(project_id),
309 worktree_id: ActiveValue::set(worktree_id),
310 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
311 scan_id: ActiveValue::set(update.scan_id as i64),
312 branch: ActiveValue::set(repository.branch.clone()),
313 is_deleted: ActiveValue::set(false),
314 },
315 ))
316 .on_conflict(
317 OnConflict::columns([
318 worktree_repository::Column::ProjectId,
319 worktree_repository::Column::WorktreeId,
320 worktree_repository::Column::WorkDirectoryId,
321 ])
322 .update_columns([
323 worktree_repository::Column::ScanId,
324 worktree_repository::Column::Branch,
325 ])
326 .to_owned(),
327 )
328 .exec(&*tx)
329 .await?;
330 }
331
332 if !update.removed_repositories.is_empty() {
333 worktree_repository::Entity::update_many()
334 .filter(
335 worktree_repository::Column::ProjectId
336 .eq(project_id)
337 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
338 .and(
339 worktree_repository::Column::WorkDirectoryId
340 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
341 ),
342 )
343 .set(worktree_repository::ActiveModel {
344 is_deleted: ActiveValue::Set(true),
345 scan_id: ActiveValue::Set(update.scan_id as i64),
346 ..Default::default()
347 })
348 .exec(&*tx)
349 .await?;
350 }
351
352 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
353 Ok(connection_ids)
354 })
355 .await
356 }
357
358 /// Updates the diagnostic summary for the given connection.
359 pub async fn update_diagnostic_summary(
360 &self,
361 update: &proto::UpdateDiagnosticSummary,
362 connection: ConnectionId,
363 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
364 let project_id = ProjectId::from_proto(update.project_id);
365 let worktree_id = update.worktree_id as i64;
366 let room_id = self.room_id_for_project(project_id).await?;
367 self.room_transaction(room_id, |tx| async move {
368 let summary = update
369 .summary
370 .as_ref()
371 .ok_or_else(|| anyhow!("invalid summary"))?;
372
373 // Ensure the update comes from the host.
374 let project = project::Entity::find_by_id(project_id)
375 .one(&*tx)
376 .await?
377 .ok_or_else(|| anyhow!("no such project"))?;
378 if project.host_connection()? != connection {
379 return Err(anyhow!("can't update a project hosted by someone else"))?;
380 }
381
382 // Update summary.
383 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
384 project_id: ActiveValue::set(project_id),
385 worktree_id: ActiveValue::set(worktree_id),
386 path: ActiveValue::set(summary.path.clone()),
387 language_server_id: ActiveValue::set(summary.language_server_id as i64),
388 error_count: ActiveValue::set(summary.error_count as i32),
389 warning_count: ActiveValue::set(summary.warning_count as i32),
390 })
391 .on_conflict(
392 OnConflict::columns([
393 worktree_diagnostic_summary::Column::ProjectId,
394 worktree_diagnostic_summary::Column::WorktreeId,
395 worktree_diagnostic_summary::Column::Path,
396 ])
397 .update_columns([
398 worktree_diagnostic_summary::Column::LanguageServerId,
399 worktree_diagnostic_summary::Column::ErrorCount,
400 worktree_diagnostic_summary::Column::WarningCount,
401 ])
402 .to_owned(),
403 )
404 .exec(&*tx)
405 .await?;
406
407 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
408 Ok(connection_ids)
409 })
410 .await
411 }
412
413 /// Starts the language server for the given connection.
414 pub async fn start_language_server(
415 &self,
416 update: &proto::StartLanguageServer,
417 connection: ConnectionId,
418 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
419 let project_id = ProjectId::from_proto(update.project_id);
420 let room_id = self.room_id_for_project(project_id).await?;
421 self.room_transaction(room_id, |tx| async move {
422 let server = update
423 .server
424 .as_ref()
425 .ok_or_else(|| anyhow!("invalid language server"))?;
426
427 // Ensure the update comes from the host.
428 let project = project::Entity::find_by_id(project_id)
429 .one(&*tx)
430 .await?
431 .ok_or_else(|| anyhow!("no such project"))?;
432 if project.host_connection()? != connection {
433 return Err(anyhow!("can't update a project hosted by someone else"))?;
434 }
435
436 // Add the newly-started language server.
437 language_server::Entity::insert(language_server::ActiveModel {
438 project_id: ActiveValue::set(project_id),
439 id: ActiveValue::set(server.id as i64),
440 name: ActiveValue::set(server.name.clone()),
441 })
442 .on_conflict(
443 OnConflict::columns([
444 language_server::Column::ProjectId,
445 language_server::Column::Id,
446 ])
447 .update_column(language_server::Column::Name)
448 .to_owned(),
449 )
450 .exec(&*tx)
451 .await?;
452
453 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
454 Ok(connection_ids)
455 })
456 .await
457 }
458
459 /// Updates the worktree settings for the given connection.
460 pub async fn update_worktree_settings(
461 &self,
462 update: &proto::UpdateWorktreeSettings,
463 connection: ConnectionId,
464 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
465 let project_id = ProjectId::from_proto(update.project_id);
466 let room_id = self.room_id_for_project(project_id).await?;
467 self.room_transaction(room_id, |tx| async move {
468 // Ensure the update comes from the host.
469 let project = project::Entity::find_by_id(project_id)
470 .one(&*tx)
471 .await?
472 .ok_or_else(|| anyhow!("no such project"))?;
473 if project.host_connection()? != connection {
474 return Err(anyhow!("can't update a project hosted by someone else"))?;
475 }
476
477 if let Some(content) = &update.content {
478 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
479 project_id: ActiveValue::Set(project_id),
480 worktree_id: ActiveValue::Set(update.worktree_id as i64),
481 path: ActiveValue::Set(update.path.clone()),
482 content: ActiveValue::Set(content.clone()),
483 })
484 .on_conflict(
485 OnConflict::columns([
486 worktree_settings_file::Column::ProjectId,
487 worktree_settings_file::Column::WorktreeId,
488 worktree_settings_file::Column::Path,
489 ])
490 .update_column(worktree_settings_file::Column::Content)
491 .to_owned(),
492 )
493 .exec(&*tx)
494 .await?;
495 } else {
496 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
497 project_id: ActiveValue::Set(project_id),
498 worktree_id: ActiveValue::Set(update.worktree_id as i64),
499 path: ActiveValue::Set(update.path.clone()),
500 ..Default::default()
501 })
502 .exec(&*tx)
503 .await?;
504 }
505
506 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
507 Ok(connection_ids)
508 })
509 .await
510 }
511
512 /// Adds the given connection to the specified hosted project
513 pub async fn join_hosted_project(
514 &self,
515 id: HostedProjectId,
516 user_id: UserId,
517 connection: ConnectionId,
518 ) -> Result<(Project, ReplicaId)> {
519 self.transaction(|tx| async move {
520 let (hosted_project, role) = self.get_hosted_project(id, user_id, &tx).await?;
521 let project = project::Entity::find()
522 .filter(project::Column::HostedProjectId.eq(hosted_project.id))
523 .one(&*tx)
524 .await?
525 .ok_or_else(|| anyhow!("hosted project is no longer shared"))?;
526
527 self.join_project_internal(project, user_id, connection, role, &tx)
528 .await
529 })
530 .await
531 }
532
533 /// Adds the given connection to the specified project
534 /// in the current room.
535 pub async fn join_project_in_room(
536 &self,
537 project_id: ProjectId,
538 connection: ConnectionId,
539 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
540 let room_id = self.room_id_for_project(project_id).await?;
541 self.room_transaction(room_id, |tx| async move {
542 let participant = room_participant::Entity::find()
543 .filter(
544 Condition::all()
545 .add(
546 room_participant::Column::AnsweringConnectionId
547 .eq(connection.id as i32),
548 )
549 .add(
550 room_participant::Column::AnsweringConnectionServerId
551 .eq(connection.owner_id as i32),
552 ),
553 )
554 .one(&*tx)
555 .await?
556 .ok_or_else(|| anyhow!("must join a room first"))?;
557
558 let project = project::Entity::find_by_id(project_id)
559 .one(&*tx)
560 .await?
561 .ok_or_else(|| anyhow!("no such project"))?;
562 if project.room_id != Some(participant.room_id) {
563 return Err(anyhow!("no such project"))?;
564 }
565 self.join_project_internal(
566 project,
567 participant.user_id,
568 connection,
569 participant.role.unwrap_or(ChannelRole::Member),
570 &tx,
571 )
572 .await
573 })
574 .await
575 }
576
577 async fn join_project_internal(
578 &self,
579 project: project::Model,
580 user_id: UserId,
581 connection: ConnectionId,
582 role: ChannelRole,
583 tx: &DatabaseTransaction,
584 ) -> Result<(Project, ReplicaId)> {
585 let mut collaborators = project
586 .find_related(project_collaborator::Entity)
587 .all(tx)
588 .await?;
589 let replica_ids = collaborators
590 .iter()
591 .map(|c| c.replica_id)
592 .collect::<HashSet<_>>();
593 let mut replica_id = ReplicaId(1);
594 while replica_ids.contains(&replica_id) {
595 replica_id.0 += 1;
596 }
597 let new_collaborator = project_collaborator::ActiveModel {
598 project_id: ActiveValue::set(project.id),
599 connection_id: ActiveValue::set(connection.id as i32),
600 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
601 user_id: ActiveValue::set(user_id),
602 replica_id: ActiveValue::set(replica_id),
603 is_host: ActiveValue::set(false),
604 ..Default::default()
605 }
606 .insert(tx)
607 .await?;
608 collaborators.push(new_collaborator);
609
610 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
611 let mut worktrees = db_worktrees
612 .into_iter()
613 .map(|db_worktree| {
614 (
615 db_worktree.id as u64,
616 Worktree {
617 id: db_worktree.id as u64,
618 abs_path: db_worktree.abs_path,
619 root_name: db_worktree.root_name,
620 visible: db_worktree.visible,
621 entries: Default::default(),
622 repository_entries: Default::default(),
623 diagnostic_summaries: Default::default(),
624 settings_files: Default::default(),
625 scan_id: db_worktree.scan_id as u64,
626 completed_scan_id: db_worktree.completed_scan_id as u64,
627 },
628 )
629 })
630 .collect::<BTreeMap<_, _>>();
631
632 // Populate worktree entries.
633 {
634 let mut db_entries = worktree_entry::Entity::find()
635 .filter(
636 Condition::all()
637 .add(worktree_entry::Column::ProjectId.eq(project.id))
638 .add(worktree_entry::Column::IsDeleted.eq(false)),
639 )
640 .stream(tx)
641 .await?;
642 while let Some(db_entry) = db_entries.next().await {
643 let db_entry = db_entry?;
644 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
645 worktree.entries.push(proto::Entry {
646 id: db_entry.id as u64,
647 is_dir: db_entry.is_dir,
648 path: db_entry.path,
649 inode: db_entry.inode as u64,
650 mtime: Some(proto::Timestamp {
651 seconds: db_entry.mtime_seconds as u64,
652 nanos: db_entry.mtime_nanos as u32,
653 }),
654 is_symlink: db_entry.is_symlink,
655 is_ignored: db_entry.is_ignored,
656 is_external: db_entry.is_external,
657 git_status: db_entry.git_status.map(|status| status as i32),
658 });
659 }
660 }
661 }
662
663 // Populate repository entries.
664 {
665 let mut db_repository_entries = worktree_repository::Entity::find()
666 .filter(
667 Condition::all()
668 .add(worktree_repository::Column::ProjectId.eq(project.id))
669 .add(worktree_repository::Column::IsDeleted.eq(false)),
670 )
671 .stream(tx)
672 .await?;
673 while let Some(db_repository_entry) = db_repository_entries.next().await {
674 let db_repository_entry = db_repository_entry?;
675 if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
676 {
677 worktree.repository_entries.insert(
678 db_repository_entry.work_directory_id as u64,
679 proto::RepositoryEntry {
680 work_directory_id: db_repository_entry.work_directory_id as u64,
681 branch: db_repository_entry.branch,
682 },
683 );
684 }
685 }
686 }
687
688 // Populate worktree diagnostic summaries.
689 {
690 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
691 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project.id))
692 .stream(tx)
693 .await?;
694 while let Some(db_summary) = db_summaries.next().await {
695 let db_summary = db_summary?;
696 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
697 worktree
698 .diagnostic_summaries
699 .push(proto::DiagnosticSummary {
700 path: db_summary.path,
701 language_server_id: db_summary.language_server_id as u64,
702 error_count: db_summary.error_count as u32,
703 warning_count: db_summary.warning_count as u32,
704 });
705 }
706 }
707 }
708
709 // Populate worktree settings files
710 {
711 let mut db_settings_files = worktree_settings_file::Entity::find()
712 .filter(worktree_settings_file::Column::ProjectId.eq(project.id))
713 .stream(tx)
714 .await?;
715 while let Some(db_settings_file) = db_settings_files.next().await {
716 let db_settings_file = db_settings_file?;
717 if let Some(worktree) = worktrees.get_mut(&(db_settings_file.worktree_id as u64)) {
718 worktree.settings_files.push(WorktreeSettingsFile {
719 path: db_settings_file.path,
720 content: db_settings_file.content,
721 });
722 }
723 }
724 }
725
726 // Populate language servers.
727 let language_servers = project
728 .find_related(language_server::Entity)
729 .all(tx)
730 .await?;
731
732 let project = Project {
733 id: project.id,
734 role,
735 collaborators: collaborators
736 .into_iter()
737 .map(|collaborator| ProjectCollaborator {
738 connection_id: collaborator.connection(),
739 user_id: collaborator.user_id,
740 replica_id: collaborator.replica_id,
741 is_host: collaborator.is_host,
742 })
743 .collect(),
744 worktrees,
745 language_servers: language_servers
746 .into_iter()
747 .map(|language_server| proto::LanguageServer {
748 id: language_server.id as u64,
749 name: language_server.name,
750 })
751 .collect(),
752 };
753 Ok((project, replica_id as ReplicaId))
754 }
755
756 pub async fn leave_hosted_project(
757 &self,
758 project_id: ProjectId,
759 connection: ConnectionId,
760 ) -> Result<LeftProject> {
761 self.transaction(|tx| async move {
762 let result = project_collaborator::Entity::delete_many()
763 .filter(
764 Condition::all()
765 .add(project_collaborator::Column::ProjectId.eq(project_id))
766 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
767 .add(
768 project_collaborator::Column::ConnectionServerId
769 .eq(connection.owner_id as i32),
770 ),
771 )
772 .exec(&*tx)
773 .await?;
774 if result.rows_affected == 0 {
775 return Err(anyhow!("not in the project"))?;
776 }
777
778 let project = project::Entity::find_by_id(project_id)
779 .one(&*tx)
780 .await?
781 .ok_or_else(|| anyhow!("no such project"))?;
782 let collaborators = project
783 .find_related(project_collaborator::Entity)
784 .all(&*tx)
785 .await?;
786 let connection_ids = collaborators
787 .into_iter()
788 .map(|collaborator| collaborator.connection())
789 .collect();
790 Ok(LeftProject {
791 id: project.id,
792 connection_ids,
793 host_user_id: None,
794 host_connection_id: None,
795 })
796 })
797 .await
798 }
799
800 /// Removes the given connection from the specified project.
801 pub async fn leave_project(
802 &self,
803 project_id: ProjectId,
804 connection: ConnectionId,
805 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
806 let room_id = self.room_id_for_project(project_id).await?;
807 self.room_transaction(room_id, |tx| async move {
808 let result = project_collaborator::Entity::delete_many()
809 .filter(
810 Condition::all()
811 .add(project_collaborator::Column::ProjectId.eq(project_id))
812 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
813 .add(
814 project_collaborator::Column::ConnectionServerId
815 .eq(connection.owner_id as i32),
816 ),
817 )
818 .exec(&*tx)
819 .await?;
820 if result.rows_affected == 0 {
821 Err(anyhow!("not a collaborator on this project"))?;
822 }
823
824 let project = project::Entity::find_by_id(project_id)
825 .one(&*tx)
826 .await?
827 .ok_or_else(|| anyhow!("no such project"))?;
828 let collaborators = project
829 .find_related(project_collaborator::Entity)
830 .all(&*tx)
831 .await?;
832 let connection_ids = collaborators
833 .into_iter()
834 .map(|collaborator| collaborator.connection())
835 .collect();
836
837 follower::Entity::delete_many()
838 .filter(
839 Condition::any()
840 .add(
841 Condition::all()
842 .add(follower::Column::ProjectId.eq(Some(project_id)))
843 .add(
844 follower::Column::LeaderConnectionServerId
845 .eq(connection.owner_id),
846 )
847 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
848 )
849 .add(
850 Condition::all()
851 .add(follower::Column::ProjectId.eq(Some(project_id)))
852 .add(
853 follower::Column::FollowerConnectionServerId
854 .eq(connection.owner_id),
855 )
856 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
857 ),
858 )
859 .exec(&*tx)
860 .await?;
861
862 let room = self.get_room(room_id, &tx).await?;
863 let left_project = LeftProject {
864 id: project_id,
865 host_user_id: project.host_user_id,
866 host_connection_id: Some(project.host_connection()?),
867 connection_ids,
868 };
869 Ok((room, left_project))
870 })
871 .await
872 }
873
874 pub async fn check_user_is_project_host(
875 &self,
876 project_id: ProjectId,
877 connection_id: ConnectionId,
878 ) -> Result<()> {
879 let room_id = self.room_id_for_project(project_id).await?;
880 self.room_transaction(room_id, |tx| async move {
881 project_collaborator::Entity::find()
882 .filter(
883 Condition::all()
884 .add(project_collaborator::Column::ProjectId.eq(project_id))
885 .add(project_collaborator::Column::IsHost.eq(true))
886 .add(project_collaborator::Column::ConnectionId.eq(connection_id.id))
887 .add(
888 project_collaborator::Column::ConnectionServerId
889 .eq(connection_id.owner_id),
890 ),
891 )
892 .one(&*tx)
893 .await?
894 .ok_or_else(|| anyhow!("failed to read project host"))?;
895
896 Ok(())
897 })
898 .await
899 .map(|guard| guard.into_inner())
900 }
901
902 /// Returns the host connection for a read-only request to join a shared project.
903 pub async fn host_for_read_only_project_request(
904 &self,
905 project_id: ProjectId,
906 connection_id: ConnectionId,
907 ) -> Result<ConnectionId> {
908 let room_id = self.room_id_for_project(project_id).await?;
909 self.room_transaction(room_id, |tx| async move {
910 let current_participant = room_participant::Entity::find()
911 .filter(room_participant::Column::RoomId.eq(room_id))
912 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
913 .one(&*tx)
914 .await?
915 .ok_or_else(|| anyhow!("no such room"))?;
916
917 if !current_participant
918 .role
919 .map_or(false, |role| role.can_read_projects())
920 {
921 Err(anyhow!("not authorized to read projects"))?;
922 }
923
924 let host = project_collaborator::Entity::find()
925 .filter(
926 project_collaborator::Column::ProjectId
927 .eq(project_id)
928 .and(project_collaborator::Column::IsHost.eq(true)),
929 )
930 .one(&*tx)
931 .await?
932 .ok_or_else(|| anyhow!("failed to read project host"))?;
933
934 Ok(host.connection())
935 })
936 .await
937 .map(|guard| guard.into_inner())
938 }
939
940 /// Returns the host connection for a request to join a shared project.
941 pub async fn host_for_mutating_project_request(
942 &self,
943 project_id: ProjectId,
944 connection_id: ConnectionId,
945 ) -> Result<ConnectionId> {
946 let room_id = self.room_id_for_project(project_id).await?;
947 self.room_transaction(room_id, |tx| async move {
948 let current_participant = room_participant::Entity::find()
949 .filter(room_participant::Column::RoomId.eq(room_id))
950 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
951 .one(&*tx)
952 .await?
953 .ok_or_else(|| anyhow!("no such room"))?;
954
955 if !current_participant
956 .role
957 .map_or(false, |role| role.can_edit_projects())
958 {
959 Err(anyhow!("not authorized to edit projects"))?;
960 }
961
962 let host = project_collaborator::Entity::find()
963 .filter(
964 project_collaborator::Column::ProjectId
965 .eq(project_id)
966 .and(project_collaborator::Column::IsHost.eq(true)),
967 )
968 .one(&*tx)
969 .await?
970 .ok_or_else(|| anyhow!("failed to read project host"))?;
971
972 Ok(host.connection())
973 })
974 .await
975 .map(|guard| guard.into_inner())
976 }
977
978 pub async fn project_collaborators_for_buffer_update(
979 &self,
980 project_id: ProjectId,
981 connection_id: ConnectionId,
982 requires_write: bool,
983 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
984 let room_id = self.room_id_for_project(project_id).await?;
985 self.room_transaction(room_id, |tx| async move {
986 let current_participant = room_participant::Entity::find()
987 .filter(room_participant::Column::RoomId.eq(room_id))
988 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
989 .one(&*tx)
990 .await?
991 .ok_or_else(|| anyhow!("no such room"))?;
992
993 if requires_write
994 && !current_participant
995 .role
996 .map_or(false, |role| role.can_edit_projects())
997 {
998 Err(anyhow!("not authorized to edit projects"))?;
999 }
1000
1001 let collaborators = project_collaborator::Entity::find()
1002 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1003 .all(&*tx)
1004 .await?
1005 .into_iter()
1006 .map(|collaborator| ProjectCollaborator {
1007 connection_id: collaborator.connection(),
1008 user_id: collaborator.user_id,
1009 replica_id: collaborator.replica_id,
1010 is_host: collaborator.is_host,
1011 })
1012 .collect::<Vec<_>>();
1013
1014 if collaborators
1015 .iter()
1016 .any(|collaborator| collaborator.connection_id == connection_id)
1017 {
1018 Ok(collaborators)
1019 } else {
1020 Err(anyhow!("no such project"))?
1021 }
1022 })
1023 .await
1024 }
1025
1026 /// Returns the connection IDs in the given project.
1027 ///
1028 /// The provided `connection_id` must also be a collaborator in the project,
1029 /// otherwise an error will be returned.
1030 pub async fn project_connection_ids(
1031 &self,
1032 project_id: ProjectId,
1033 connection_id: ConnectionId,
1034 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1035 let room_id = self.room_id_for_project(project_id).await?;
1036 self.room_transaction(room_id, |tx| async move {
1037 let mut collaborators = project_collaborator::Entity::find()
1038 .filter(project_collaborator::Column::ProjectId.eq(project_id))
1039 .stream(&*tx)
1040 .await?;
1041
1042 let mut connection_ids = HashSet::default();
1043 while let Some(collaborator) = collaborators.next().await {
1044 let collaborator = collaborator?;
1045 connection_ids.insert(collaborator.connection());
1046 }
1047
1048 if connection_ids.contains(&connection_id) {
1049 Ok(connection_ids)
1050 } else {
1051 Err(anyhow!("no such project"))?
1052 }
1053 })
1054 .await
1055 }
1056
1057 async fn project_guest_connection_ids(
1058 &self,
1059 project_id: ProjectId,
1060 tx: &DatabaseTransaction,
1061 ) -> Result<Vec<ConnectionId>> {
1062 let mut collaborators = project_collaborator::Entity::find()
1063 .filter(
1064 project_collaborator::Column::ProjectId
1065 .eq(project_id)
1066 .and(project_collaborator::Column::IsHost.eq(false)),
1067 )
1068 .stream(tx)
1069 .await?;
1070
1071 let mut guest_connection_ids = Vec::new();
1072 while let Some(collaborator) = collaborators.next().await {
1073 let collaborator = collaborator?;
1074 guest_connection_ids.push(collaborator.connection());
1075 }
1076 Ok(guest_connection_ids)
1077 }
1078
1079 /// Returns the [`RoomId`] for the given project.
1080 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
1081 self.transaction(|tx| async move {
1082 let project = project::Entity::find_by_id(project_id)
1083 .one(&*tx)
1084 .await?
1085 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
1086 Ok(project
1087 .room_id
1088 .ok_or_else(|| anyhow!("project not in room"))?)
1089 })
1090 .await
1091 }
1092
1093 pub async fn check_room_participants(
1094 &self,
1095 room_id: RoomId,
1096 leader_id: ConnectionId,
1097 follower_id: ConnectionId,
1098 ) -> Result<()> {
1099 self.transaction(|tx| async move {
1100 use room_participant::Column;
1101
1102 let count = room_participant::Entity::find()
1103 .filter(
1104 Condition::all().add(Column::RoomId.eq(room_id)).add(
1105 Condition::any()
1106 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1107 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1108 ))
1109 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1110 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1111 )),
1112 ),
1113 )
1114 .count(&*tx)
1115 .await?;
1116
1117 if count < 2 {
1118 Err(anyhow!("not room participants"))?;
1119 }
1120
1121 Ok(())
1122 })
1123 .await
1124 }
1125
1126 /// Adds the given follower connection as a follower of the given leader connection.
1127 pub async fn follow(
1128 &self,
1129 room_id: RoomId,
1130 project_id: ProjectId,
1131 leader_connection: ConnectionId,
1132 follower_connection: ConnectionId,
1133 ) -> Result<RoomGuard<proto::Room>> {
1134 self.room_transaction(room_id, |tx| async move {
1135 follower::ActiveModel {
1136 room_id: ActiveValue::set(room_id),
1137 project_id: ActiveValue::set(project_id),
1138 leader_connection_server_id: ActiveValue::set(ServerId(
1139 leader_connection.owner_id as i32,
1140 )),
1141 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1142 follower_connection_server_id: ActiveValue::set(ServerId(
1143 follower_connection.owner_id as i32,
1144 )),
1145 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1146 ..Default::default()
1147 }
1148 .insert(&*tx)
1149 .await?;
1150
1151 let room = self.get_room(room_id, &tx).await?;
1152 Ok(room)
1153 })
1154 .await
1155 }
1156
1157 /// Removes the given follower connection as a follower of the given leader connection.
1158 pub async fn unfollow(
1159 &self,
1160 room_id: RoomId,
1161 project_id: ProjectId,
1162 leader_connection: ConnectionId,
1163 follower_connection: ConnectionId,
1164 ) -> Result<RoomGuard<proto::Room>> {
1165 self.room_transaction(room_id, |tx| async move {
1166 follower::Entity::delete_many()
1167 .filter(
1168 Condition::all()
1169 .add(follower::Column::RoomId.eq(room_id))
1170 .add(follower::Column::ProjectId.eq(project_id))
1171 .add(
1172 follower::Column::LeaderConnectionServerId
1173 .eq(leader_connection.owner_id),
1174 )
1175 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1176 .add(
1177 follower::Column::FollowerConnectionServerId
1178 .eq(follower_connection.owner_id),
1179 )
1180 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1181 )
1182 .exec(&*tx)
1183 .await?;
1184
1185 let room = self.get_room(room_id, &tx).await?;
1186 Ok(room)
1187 })
1188 .await
1189 }
1190}