1use super::*;
2
3impl Database {
4 /// Returns the count of all projects, excluding ones marked as admin.
5 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
6 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
7 enum QueryAs {
8 Count,
9 }
10
11 self.transaction(|tx| async move {
12 Ok(project::Entity::find()
13 .select_only()
14 .column_as(project::Column::Id.count(), QueryAs::Count)
15 .inner_join(user::Entity)
16 .filter(user::Column::Admin.eq(false))
17 .into_values::<_, QueryAs>()
18 .one(&*tx)
19 .await?
20 .unwrap_or(0i64) as usize)
21 })
22 .await
23 }
24
25 /// Shares a project with the given room.
26 pub async fn share_project(
27 &self,
28 room_id: RoomId,
29 connection: ConnectionId,
30 worktrees: &[proto::WorktreeMetadata],
31 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
32 self.room_transaction(room_id, |tx| async move {
33 let participant = room_participant::Entity::find()
34 .filter(
35 Condition::all()
36 .add(
37 room_participant::Column::AnsweringConnectionId
38 .eq(connection.id as i32),
39 )
40 .add(
41 room_participant::Column::AnsweringConnectionServerId
42 .eq(connection.owner_id as i32),
43 ),
44 )
45 .one(&*tx)
46 .await?
47 .ok_or_else(|| anyhow!("could not find participant"))?;
48 if participant.room_id != room_id {
49 return Err(anyhow!("shared project on unexpected room"))?;
50 }
51 if !participant
52 .role
53 .unwrap_or(ChannelRole::Member)
54 .can_edit_projects()
55 {
56 return Err(anyhow!("guests cannot share projects"))?;
57 }
58
59 let project = project::ActiveModel {
60 room_id: ActiveValue::set(participant.room_id),
61 host_user_id: ActiveValue::set(participant.user_id),
62 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
63 host_connection_server_id: ActiveValue::set(Some(ServerId(
64 connection.owner_id as i32,
65 ))),
66 ..Default::default()
67 }
68 .insert(&*tx)
69 .await?;
70
71 if !worktrees.is_empty() {
72 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
73 worktree::ActiveModel {
74 id: ActiveValue::set(worktree.id as i64),
75 project_id: ActiveValue::set(project.id),
76 abs_path: ActiveValue::set(worktree.abs_path.clone()),
77 root_name: ActiveValue::set(worktree.root_name.clone()),
78 visible: ActiveValue::set(worktree.visible),
79 scan_id: ActiveValue::set(0),
80 completed_scan_id: ActiveValue::set(0),
81 }
82 }))
83 .exec(&*tx)
84 .await?;
85 }
86
87 project_collaborator::ActiveModel {
88 project_id: ActiveValue::set(project.id),
89 connection_id: ActiveValue::set(connection.id as i32),
90 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
91 user_id: ActiveValue::set(participant.user_id),
92 replica_id: ActiveValue::set(ReplicaId(0)),
93 is_host: ActiveValue::set(true),
94 ..Default::default()
95 }
96 .insert(&*tx)
97 .await?;
98
99 let room = self.get_room(room_id, &tx).await?;
100 Ok((project.id, room))
101 })
102 .await
103 }
104
105 /// Unshares the given project.
106 pub async fn unshare_project(
107 &self,
108 project_id: ProjectId,
109 connection: ConnectionId,
110 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
111 let room_id = self.room_id_for_project(project_id).await?;
112 self.room_transaction(room_id, |tx| async move {
113 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
114
115 let project = project::Entity::find_by_id(project_id)
116 .one(&*tx)
117 .await?
118 .ok_or_else(|| anyhow!("project not found"))?;
119 if project.host_connection()? == connection {
120 project::Entity::delete(project.into_active_model())
121 .exec(&*tx)
122 .await?;
123 let room = self.get_room(room_id, &tx).await?;
124 Ok((room, guest_connection_ids))
125 } else {
126 Err(anyhow!("cannot unshare a project hosted by another user"))?
127 }
128 })
129 .await
130 }
131
132 /// Updates the worktrees associated with the given project.
133 pub async fn update_project(
134 &self,
135 project_id: ProjectId,
136 connection: ConnectionId,
137 worktrees: &[proto::WorktreeMetadata],
138 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
139 let room_id = self.room_id_for_project(project_id).await?;
140 self.room_transaction(room_id, |tx| async move {
141 let project = project::Entity::find_by_id(project_id)
142 .filter(
143 Condition::all()
144 .add(project::Column::HostConnectionId.eq(connection.id as i32))
145 .add(
146 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
147 ),
148 )
149 .one(&*tx)
150 .await?
151 .ok_or_else(|| anyhow!("no such project"))?;
152
153 self.update_project_worktrees(project.id, worktrees, &tx)
154 .await?;
155
156 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
157 let room = self.get_room(project.room_id, &tx).await?;
158 Ok((room, guest_connection_ids))
159 })
160 .await
161 }
162
163 pub(in crate::db) async fn update_project_worktrees(
164 &self,
165 project_id: ProjectId,
166 worktrees: &[proto::WorktreeMetadata],
167 tx: &DatabaseTransaction,
168 ) -> Result<()> {
169 if !worktrees.is_empty() {
170 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
171 id: ActiveValue::set(worktree.id as i64),
172 project_id: ActiveValue::set(project_id),
173 abs_path: ActiveValue::set(worktree.abs_path.clone()),
174 root_name: ActiveValue::set(worktree.root_name.clone()),
175 visible: ActiveValue::set(worktree.visible),
176 scan_id: ActiveValue::set(0),
177 completed_scan_id: ActiveValue::set(0),
178 }))
179 .on_conflict(
180 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
181 .update_column(worktree::Column::RootName)
182 .to_owned(),
183 )
184 .exec(&*tx)
185 .await?;
186 }
187
188 worktree::Entity::delete_many()
189 .filter(worktree::Column::ProjectId.eq(project_id).and(
190 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
191 ))
192 .exec(&*tx)
193 .await?;
194
195 Ok(())
196 }
197
198 pub async fn update_worktree(
199 &self,
200 update: &proto::UpdateWorktree,
201 connection: ConnectionId,
202 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
203 let project_id = ProjectId::from_proto(update.project_id);
204 let worktree_id = update.worktree_id as i64;
205 let room_id = self.room_id_for_project(project_id).await?;
206 self.room_transaction(room_id, |tx| async move {
207 // Ensure the update comes from the host.
208 let _project = project::Entity::find_by_id(project_id)
209 .filter(
210 Condition::all()
211 .add(project::Column::HostConnectionId.eq(connection.id as i32))
212 .add(
213 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
214 ),
215 )
216 .one(&*tx)
217 .await?
218 .ok_or_else(|| anyhow!("no such project"))?;
219
220 // Update metadata.
221 worktree::Entity::update(worktree::ActiveModel {
222 id: ActiveValue::set(worktree_id),
223 project_id: ActiveValue::set(project_id),
224 root_name: ActiveValue::set(update.root_name.clone()),
225 scan_id: ActiveValue::set(update.scan_id as i64),
226 completed_scan_id: if update.is_last_update {
227 ActiveValue::set(update.scan_id as i64)
228 } else {
229 ActiveValue::default()
230 },
231 abs_path: ActiveValue::set(update.abs_path.clone()),
232 ..Default::default()
233 })
234 .exec(&*tx)
235 .await?;
236
237 if !update.updated_entries.is_empty() {
238 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
239 let mtime = entry.mtime.clone().unwrap_or_default();
240 worktree_entry::ActiveModel {
241 project_id: ActiveValue::set(project_id),
242 worktree_id: ActiveValue::set(worktree_id),
243 id: ActiveValue::set(entry.id as i64),
244 is_dir: ActiveValue::set(entry.is_dir),
245 path: ActiveValue::set(entry.path.clone()),
246 inode: ActiveValue::set(entry.inode as i64),
247 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
248 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
249 is_symlink: ActiveValue::set(entry.is_symlink),
250 is_ignored: ActiveValue::set(entry.is_ignored),
251 is_external: ActiveValue::set(entry.is_external),
252 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
253 is_deleted: ActiveValue::set(false),
254 scan_id: ActiveValue::set(update.scan_id as i64),
255 }
256 }))
257 .on_conflict(
258 OnConflict::columns([
259 worktree_entry::Column::ProjectId,
260 worktree_entry::Column::WorktreeId,
261 worktree_entry::Column::Id,
262 ])
263 .update_columns([
264 worktree_entry::Column::IsDir,
265 worktree_entry::Column::Path,
266 worktree_entry::Column::Inode,
267 worktree_entry::Column::MtimeSeconds,
268 worktree_entry::Column::MtimeNanos,
269 worktree_entry::Column::IsSymlink,
270 worktree_entry::Column::IsIgnored,
271 worktree_entry::Column::GitStatus,
272 worktree_entry::Column::ScanId,
273 ])
274 .to_owned(),
275 )
276 .exec(&*tx)
277 .await?;
278 }
279
280 if !update.removed_entries.is_empty() {
281 worktree_entry::Entity::update_many()
282 .filter(
283 worktree_entry::Column::ProjectId
284 .eq(project_id)
285 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
286 .and(
287 worktree_entry::Column::Id
288 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
289 ),
290 )
291 .set(worktree_entry::ActiveModel {
292 is_deleted: ActiveValue::Set(true),
293 scan_id: ActiveValue::Set(update.scan_id as i64),
294 ..Default::default()
295 })
296 .exec(&*tx)
297 .await?;
298 }
299
300 if !update.updated_repositories.is_empty() {
301 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
302 |repository| worktree_repository::ActiveModel {
303 project_id: ActiveValue::set(project_id),
304 worktree_id: ActiveValue::set(worktree_id),
305 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
306 scan_id: ActiveValue::set(update.scan_id as i64),
307 branch: ActiveValue::set(repository.branch.clone()),
308 is_deleted: ActiveValue::set(false),
309 },
310 ))
311 .on_conflict(
312 OnConflict::columns([
313 worktree_repository::Column::ProjectId,
314 worktree_repository::Column::WorktreeId,
315 worktree_repository::Column::WorkDirectoryId,
316 ])
317 .update_columns([
318 worktree_repository::Column::ScanId,
319 worktree_repository::Column::Branch,
320 ])
321 .to_owned(),
322 )
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.removed_repositories.is_empty() {
328 worktree_repository::Entity::update_many()
329 .filter(
330 worktree_repository::Column::ProjectId
331 .eq(project_id)
332 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
333 .and(
334 worktree_repository::Column::WorkDirectoryId
335 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
336 ),
337 )
338 .set(worktree_repository::ActiveModel {
339 is_deleted: ActiveValue::Set(true),
340 scan_id: ActiveValue::Set(update.scan_id as i64),
341 ..Default::default()
342 })
343 .exec(&*tx)
344 .await?;
345 }
346
347 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
348 Ok(connection_ids)
349 })
350 .await
351 }
352
353 /// Updates the diagnostic summary for the given connection.
354 pub async fn update_diagnostic_summary(
355 &self,
356 update: &proto::UpdateDiagnosticSummary,
357 connection: ConnectionId,
358 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
359 let project_id = ProjectId::from_proto(update.project_id);
360 let worktree_id = update.worktree_id as i64;
361 let room_id = self.room_id_for_project(project_id).await?;
362 self.room_transaction(room_id, |tx| async move {
363 let summary = update
364 .summary
365 .as_ref()
366 .ok_or_else(|| anyhow!("invalid summary"))?;
367
368 // Ensure the update comes from the host.
369 let project = project::Entity::find_by_id(project_id)
370 .one(&*tx)
371 .await?
372 .ok_or_else(|| anyhow!("no such project"))?;
373 if project.host_connection()? != connection {
374 return Err(anyhow!("can't update a project hosted by someone else"))?;
375 }
376
377 // Update summary.
378 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
379 project_id: ActiveValue::set(project_id),
380 worktree_id: ActiveValue::set(worktree_id),
381 path: ActiveValue::set(summary.path.clone()),
382 language_server_id: ActiveValue::set(summary.language_server_id as i64),
383 error_count: ActiveValue::set(summary.error_count as i32),
384 warning_count: ActiveValue::set(summary.warning_count as i32),
385 })
386 .on_conflict(
387 OnConflict::columns([
388 worktree_diagnostic_summary::Column::ProjectId,
389 worktree_diagnostic_summary::Column::WorktreeId,
390 worktree_diagnostic_summary::Column::Path,
391 ])
392 .update_columns([
393 worktree_diagnostic_summary::Column::LanguageServerId,
394 worktree_diagnostic_summary::Column::ErrorCount,
395 worktree_diagnostic_summary::Column::WarningCount,
396 ])
397 .to_owned(),
398 )
399 .exec(&*tx)
400 .await?;
401
402 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
403 Ok(connection_ids)
404 })
405 .await
406 }
407
408 /// Starts the language server for the given connection.
409 pub async fn start_language_server(
410 &self,
411 update: &proto::StartLanguageServer,
412 connection: ConnectionId,
413 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
414 let project_id = ProjectId::from_proto(update.project_id);
415 let room_id = self.room_id_for_project(project_id).await?;
416 self.room_transaction(room_id, |tx| async move {
417 let server = update
418 .server
419 .as_ref()
420 .ok_or_else(|| anyhow!("invalid language server"))?;
421
422 // Ensure the update comes from the host.
423 let project = project::Entity::find_by_id(project_id)
424 .one(&*tx)
425 .await?
426 .ok_or_else(|| anyhow!("no such project"))?;
427 if project.host_connection()? != connection {
428 return Err(anyhow!("can't update a project hosted by someone else"))?;
429 }
430
431 // Add the newly-started language server.
432 language_server::Entity::insert(language_server::ActiveModel {
433 project_id: ActiveValue::set(project_id),
434 id: ActiveValue::set(server.id as i64),
435 name: ActiveValue::set(server.name.clone()),
436 })
437 .on_conflict(
438 OnConflict::columns([
439 language_server::Column::ProjectId,
440 language_server::Column::Id,
441 ])
442 .update_column(language_server::Column::Name)
443 .to_owned(),
444 )
445 .exec(&*tx)
446 .await?;
447
448 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
449 Ok(connection_ids)
450 })
451 .await
452 }
453
454 /// Updates the worktree settings for the given connection.
455 pub async fn update_worktree_settings(
456 &self,
457 update: &proto::UpdateWorktreeSettings,
458 connection: ConnectionId,
459 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
460 let project_id = ProjectId::from_proto(update.project_id);
461 let room_id = self.room_id_for_project(project_id).await?;
462 self.room_transaction(room_id, |tx| async move {
463 // Ensure the update comes from the host.
464 let project = project::Entity::find_by_id(project_id)
465 .one(&*tx)
466 .await?
467 .ok_or_else(|| anyhow!("no such project"))?;
468 if project.host_connection()? != connection {
469 return Err(anyhow!("can't update a project hosted by someone else"))?;
470 }
471
472 if let Some(content) = &update.content {
473 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
474 project_id: ActiveValue::Set(project_id),
475 worktree_id: ActiveValue::Set(update.worktree_id as i64),
476 path: ActiveValue::Set(update.path.clone()),
477 content: ActiveValue::Set(content.clone()),
478 })
479 .on_conflict(
480 OnConflict::columns([
481 worktree_settings_file::Column::ProjectId,
482 worktree_settings_file::Column::WorktreeId,
483 worktree_settings_file::Column::Path,
484 ])
485 .update_column(worktree_settings_file::Column::Content)
486 .to_owned(),
487 )
488 .exec(&*tx)
489 .await?;
490 } else {
491 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
492 project_id: ActiveValue::Set(project_id),
493 worktree_id: ActiveValue::Set(update.worktree_id as i64),
494 path: ActiveValue::Set(update.path.clone()),
495 ..Default::default()
496 })
497 .exec(&*tx)
498 .await?;
499 }
500
501 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
502 Ok(connection_ids)
503 })
504 .await
505 }
506
507 /// Adds the given connection to the specified project.
508 pub async fn join_project(
509 &self,
510 project_id: ProjectId,
511 connection: ConnectionId,
512 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
513 let room_id = self.room_id_for_project(project_id).await?;
514 self.room_transaction(room_id, |tx| async move {
515 let participant = room_participant::Entity::find()
516 .filter(
517 Condition::all()
518 .add(
519 room_participant::Column::AnsweringConnectionId
520 .eq(connection.id as i32),
521 )
522 .add(
523 room_participant::Column::AnsweringConnectionServerId
524 .eq(connection.owner_id as i32),
525 ),
526 )
527 .one(&*tx)
528 .await?
529 .ok_or_else(|| anyhow!("must join a room first"))?;
530
531 let project = project::Entity::find_by_id(project_id)
532 .one(&*tx)
533 .await?
534 .ok_or_else(|| anyhow!("no such project"))?;
535 if project.room_id != participant.room_id {
536 return Err(anyhow!("no such project"))?;
537 }
538
539 let mut collaborators = project
540 .find_related(project_collaborator::Entity)
541 .all(&*tx)
542 .await?;
543 let replica_ids = collaborators
544 .iter()
545 .map(|c| c.replica_id)
546 .collect::<HashSet<_>>();
547 let mut replica_id = ReplicaId(1);
548 while replica_ids.contains(&replica_id) {
549 replica_id.0 += 1;
550 }
551 let new_collaborator = project_collaborator::ActiveModel {
552 project_id: ActiveValue::set(project_id),
553 connection_id: ActiveValue::set(connection.id as i32),
554 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
555 user_id: ActiveValue::set(participant.user_id),
556 replica_id: ActiveValue::set(replica_id),
557 is_host: ActiveValue::set(false),
558 ..Default::default()
559 }
560 .insert(&*tx)
561 .await?;
562 collaborators.push(new_collaborator);
563
564 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
565 let mut worktrees = db_worktrees
566 .into_iter()
567 .map(|db_worktree| {
568 (
569 db_worktree.id as u64,
570 Worktree {
571 id: db_worktree.id as u64,
572 abs_path: db_worktree.abs_path,
573 root_name: db_worktree.root_name,
574 visible: db_worktree.visible,
575 entries: Default::default(),
576 repository_entries: Default::default(),
577 diagnostic_summaries: Default::default(),
578 settings_files: Default::default(),
579 scan_id: db_worktree.scan_id as u64,
580 completed_scan_id: db_worktree.completed_scan_id as u64,
581 },
582 )
583 })
584 .collect::<BTreeMap<_, _>>();
585
586 // Populate worktree entries.
587 {
588 let mut db_entries = worktree_entry::Entity::find()
589 .filter(
590 Condition::all()
591 .add(worktree_entry::Column::ProjectId.eq(project_id))
592 .add(worktree_entry::Column::IsDeleted.eq(false)),
593 )
594 .stream(&*tx)
595 .await?;
596 while let Some(db_entry) = db_entries.next().await {
597 let db_entry = db_entry?;
598 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
599 worktree.entries.push(proto::Entry {
600 id: db_entry.id as u64,
601 is_dir: db_entry.is_dir,
602 path: db_entry.path,
603 inode: db_entry.inode as u64,
604 mtime: Some(proto::Timestamp {
605 seconds: db_entry.mtime_seconds as u64,
606 nanos: db_entry.mtime_nanos as u32,
607 }),
608 is_symlink: db_entry.is_symlink,
609 is_ignored: db_entry.is_ignored,
610 is_external: db_entry.is_external,
611 git_status: db_entry.git_status.map(|status| status as i32),
612 });
613 }
614 }
615 }
616
617 // Populate repository entries.
618 {
619 let mut db_repository_entries = worktree_repository::Entity::find()
620 .filter(
621 Condition::all()
622 .add(worktree_repository::Column::ProjectId.eq(project_id))
623 .add(worktree_repository::Column::IsDeleted.eq(false)),
624 )
625 .stream(&*tx)
626 .await?;
627 while let Some(db_repository_entry) = db_repository_entries.next().await {
628 let db_repository_entry = db_repository_entry?;
629 if let Some(worktree) =
630 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
631 {
632 worktree.repository_entries.insert(
633 db_repository_entry.work_directory_id as u64,
634 proto::RepositoryEntry {
635 work_directory_id: db_repository_entry.work_directory_id as u64,
636 branch: db_repository_entry.branch,
637 },
638 );
639 }
640 }
641 }
642
643 // Populate worktree diagnostic summaries.
644 {
645 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
646 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
647 .stream(&*tx)
648 .await?;
649 while let Some(db_summary) = db_summaries.next().await {
650 let db_summary = db_summary?;
651 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
652 worktree
653 .diagnostic_summaries
654 .push(proto::DiagnosticSummary {
655 path: db_summary.path,
656 language_server_id: db_summary.language_server_id as u64,
657 error_count: db_summary.error_count as u32,
658 warning_count: db_summary.warning_count as u32,
659 });
660 }
661 }
662 }
663
664 // Populate worktree settings files
665 {
666 let mut db_settings_files = worktree_settings_file::Entity::find()
667 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
668 .stream(&*tx)
669 .await?;
670 while let Some(db_settings_file) = db_settings_files.next().await {
671 let db_settings_file = db_settings_file?;
672 if let Some(worktree) =
673 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
674 {
675 worktree.settings_files.push(WorktreeSettingsFile {
676 path: db_settings_file.path,
677 content: db_settings_file.content,
678 });
679 }
680 }
681 }
682
683 // Populate language servers.
684 let language_servers = project
685 .find_related(language_server::Entity)
686 .all(&*tx)
687 .await?;
688
689 let project = Project {
690 collaborators: collaborators
691 .into_iter()
692 .map(|collaborator| ProjectCollaborator {
693 connection_id: collaborator.connection(),
694 user_id: collaborator.user_id,
695 replica_id: collaborator.replica_id,
696 is_host: collaborator.is_host,
697 })
698 .collect(),
699 worktrees,
700 language_servers: language_servers
701 .into_iter()
702 .map(|language_server| proto::LanguageServer {
703 id: language_server.id as u64,
704 name: language_server.name,
705 })
706 .collect(),
707 };
708 Ok((project, replica_id as ReplicaId))
709 })
710 .await
711 }
712
713 /// Removes the given connection from the specified project.
714 pub async fn leave_project(
715 &self,
716 project_id: ProjectId,
717 connection: ConnectionId,
718 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
719 let room_id = self.room_id_for_project(project_id).await?;
720 self.room_transaction(room_id, |tx| async move {
721 let result = project_collaborator::Entity::delete_many()
722 .filter(
723 Condition::all()
724 .add(project_collaborator::Column::ProjectId.eq(project_id))
725 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
726 .add(
727 project_collaborator::Column::ConnectionServerId
728 .eq(connection.owner_id as i32),
729 ),
730 )
731 .exec(&*tx)
732 .await?;
733 if result.rows_affected == 0 {
734 Err(anyhow!("not a collaborator on this project"))?;
735 }
736
737 let project = project::Entity::find_by_id(project_id)
738 .one(&*tx)
739 .await?
740 .ok_or_else(|| anyhow!("no such project"))?;
741 let collaborators = project
742 .find_related(project_collaborator::Entity)
743 .all(&*tx)
744 .await?;
745 let connection_ids = collaborators
746 .into_iter()
747 .map(|collaborator| collaborator.connection())
748 .collect();
749
750 follower::Entity::delete_many()
751 .filter(
752 Condition::any()
753 .add(
754 Condition::all()
755 .add(follower::Column::ProjectId.eq(Some(project_id)))
756 .add(
757 follower::Column::LeaderConnectionServerId
758 .eq(connection.owner_id),
759 )
760 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
761 )
762 .add(
763 Condition::all()
764 .add(follower::Column::ProjectId.eq(Some(project_id)))
765 .add(
766 follower::Column::FollowerConnectionServerId
767 .eq(connection.owner_id),
768 )
769 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
770 ),
771 )
772 .exec(&*tx)
773 .await?;
774
775 let room = self.get_room(project.room_id, &tx).await?;
776 let left_project = LeftProject {
777 id: project_id,
778 host_user_id: project.host_user_id,
779 host_connection_id: Some(project.host_connection()?),
780 connection_ids,
781 };
782 Ok((room, left_project))
783 })
784 .await
785 }
786
787 pub async fn check_user_is_project_host(
788 &self,
789 project_id: ProjectId,
790 connection_id: ConnectionId,
791 ) -> Result<()> {
792 let room_id = self.room_id_for_project(project_id).await?;
793 self.room_transaction(room_id, |tx| async move {
794 project_collaborator::Entity::find()
795 .filter(
796 Condition::all()
797 .add(project_collaborator::Column::ProjectId.eq(project_id))
798 .add(project_collaborator::Column::IsHost.eq(true))
799 .add(project_collaborator::Column::ConnectionId.eq(connection_id.id))
800 .add(
801 project_collaborator::Column::ConnectionServerId
802 .eq(connection_id.owner_id),
803 ),
804 )
805 .one(&*tx)
806 .await?
807 .ok_or_else(|| anyhow!("failed to read project host"))?;
808
809 Ok(())
810 })
811 .await
812 .map(|guard| guard.into_inner())
813 }
814
815 /// Returns the host connection for a read-only request to join a shared project.
816 pub async fn host_for_read_only_project_request(
817 &self,
818 project_id: ProjectId,
819 connection_id: ConnectionId,
820 ) -> Result<ConnectionId> {
821 let room_id = self.room_id_for_project(project_id).await?;
822 self.room_transaction(room_id, |tx| async move {
823 let current_participant = room_participant::Entity::find()
824 .filter(room_participant::Column::RoomId.eq(room_id))
825 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
826 .one(&*tx)
827 .await?
828 .ok_or_else(|| anyhow!("no such room"))?;
829
830 if !current_participant
831 .role
832 .map_or(false, |role| role.can_read_projects())
833 {
834 Err(anyhow!("not authorized to read projects"))?;
835 }
836
837 let host = project_collaborator::Entity::find()
838 .filter(
839 project_collaborator::Column::ProjectId
840 .eq(project_id)
841 .and(project_collaborator::Column::IsHost.eq(true)),
842 )
843 .one(&*tx)
844 .await?
845 .ok_or_else(|| anyhow!("failed to read project host"))?;
846
847 Ok(host.connection())
848 })
849 .await
850 .map(|guard| guard.into_inner())
851 }
852
853 /// Returns the host connection for a request to join a shared project.
854 pub async fn host_for_mutating_project_request(
855 &self,
856 project_id: ProjectId,
857 connection_id: ConnectionId,
858 ) -> Result<ConnectionId> {
859 let room_id = self.room_id_for_project(project_id).await?;
860 self.room_transaction(room_id, |tx| async move {
861 let current_participant = room_participant::Entity::find()
862 .filter(room_participant::Column::RoomId.eq(room_id))
863 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
864 .one(&*tx)
865 .await?
866 .ok_or_else(|| anyhow!("no such room"))?;
867
868 if !current_participant
869 .role
870 .map_or(false, |role| role.can_edit_projects())
871 {
872 Err(anyhow!("not authorized to edit projects"))?;
873 }
874
875 let host = project_collaborator::Entity::find()
876 .filter(
877 project_collaborator::Column::ProjectId
878 .eq(project_id)
879 .and(project_collaborator::Column::IsHost.eq(true)),
880 )
881 .one(&*tx)
882 .await?
883 .ok_or_else(|| anyhow!("failed to read project host"))?;
884
885 Ok(host.connection())
886 })
887 .await
888 .map(|guard| guard.into_inner())
889 }
890
891 pub async fn project_collaborators_for_buffer_update(
892 &self,
893 project_id: ProjectId,
894 connection_id: ConnectionId,
895 requires_write: bool,
896 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
897 let room_id = self.room_id_for_project(project_id).await?;
898 self.room_transaction(room_id, |tx| async move {
899 let current_participant = room_participant::Entity::find()
900 .filter(room_participant::Column::RoomId.eq(room_id))
901 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
902 .one(&*tx)
903 .await?
904 .ok_or_else(|| anyhow!("no such room"))?;
905
906 if requires_write
907 && !current_participant
908 .role
909 .map_or(false, |role| role.can_edit_projects())
910 {
911 Err(anyhow!("not authorized to edit projects"))?;
912 }
913
914 let collaborators = project_collaborator::Entity::find()
915 .filter(project_collaborator::Column::ProjectId.eq(project_id))
916 .all(&*tx)
917 .await?
918 .into_iter()
919 .map(|collaborator| ProjectCollaborator {
920 connection_id: collaborator.connection(),
921 user_id: collaborator.user_id,
922 replica_id: collaborator.replica_id,
923 is_host: collaborator.is_host,
924 })
925 .collect::<Vec<_>>();
926
927 if collaborators
928 .iter()
929 .any(|collaborator| collaborator.connection_id == connection_id)
930 {
931 Ok(collaborators)
932 } else {
933 Err(anyhow!("no such project"))?
934 }
935 })
936 .await
937 }
938
939 /// Returns the connection IDs in the given project.
940 ///
941 /// The provided `connection_id` must also be a collaborator in the project,
942 /// otherwise an error will be returned.
943 pub async fn project_connection_ids(
944 &self,
945 project_id: ProjectId,
946 connection_id: ConnectionId,
947 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
948 let room_id = self.room_id_for_project(project_id).await?;
949 self.room_transaction(room_id, |tx| async move {
950 let mut collaborators = project_collaborator::Entity::find()
951 .filter(project_collaborator::Column::ProjectId.eq(project_id))
952 .stream(&*tx)
953 .await?;
954
955 let mut connection_ids = HashSet::default();
956 while let Some(collaborator) = collaborators.next().await {
957 let collaborator = collaborator?;
958 connection_ids.insert(collaborator.connection());
959 }
960
961 if connection_ids.contains(&connection_id) {
962 Ok(connection_ids)
963 } else {
964 Err(anyhow!("no such project"))?
965 }
966 })
967 .await
968 }
969
970 async fn project_guest_connection_ids(
971 &self,
972 project_id: ProjectId,
973 tx: &DatabaseTransaction,
974 ) -> Result<Vec<ConnectionId>> {
975 let mut collaborators = project_collaborator::Entity::find()
976 .filter(
977 project_collaborator::Column::ProjectId
978 .eq(project_id)
979 .and(project_collaborator::Column::IsHost.eq(false)),
980 )
981 .stream(tx)
982 .await?;
983
984 let mut guest_connection_ids = Vec::new();
985 while let Some(collaborator) = collaborators.next().await {
986 let collaborator = collaborator?;
987 guest_connection_ids.push(collaborator.connection());
988 }
989 Ok(guest_connection_ids)
990 }
991
992 /// Returns the [`RoomId`] for the given project.
993 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
994 self.transaction(|tx| async move {
995 let project = project::Entity::find_by_id(project_id)
996 .one(&*tx)
997 .await?
998 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
999 Ok(project.room_id)
1000 })
1001 .await
1002 }
1003
1004 pub async fn check_room_participants(
1005 &self,
1006 room_id: RoomId,
1007 leader_id: ConnectionId,
1008 follower_id: ConnectionId,
1009 ) -> Result<()> {
1010 self.transaction(|tx| async move {
1011 use room_participant::Column;
1012
1013 let count = room_participant::Entity::find()
1014 .filter(
1015 Condition::all().add(Column::RoomId.eq(room_id)).add(
1016 Condition::any()
1017 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1018 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1019 ))
1020 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1021 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1022 )),
1023 ),
1024 )
1025 .count(&*tx)
1026 .await?;
1027
1028 if count < 2 {
1029 Err(anyhow!("not room participants"))?;
1030 }
1031
1032 Ok(())
1033 })
1034 .await
1035 }
1036
1037 /// Adds the given follower connection as a follower of the given leader connection.
1038 pub async fn follow(
1039 &self,
1040 room_id: RoomId,
1041 project_id: ProjectId,
1042 leader_connection: ConnectionId,
1043 follower_connection: ConnectionId,
1044 ) -> Result<RoomGuard<proto::Room>> {
1045 self.room_transaction(room_id, |tx| async move {
1046 follower::ActiveModel {
1047 room_id: ActiveValue::set(room_id),
1048 project_id: ActiveValue::set(project_id),
1049 leader_connection_server_id: ActiveValue::set(ServerId(
1050 leader_connection.owner_id as i32,
1051 )),
1052 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1053 follower_connection_server_id: ActiveValue::set(ServerId(
1054 follower_connection.owner_id as i32,
1055 )),
1056 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1057 ..Default::default()
1058 }
1059 .insert(&*tx)
1060 .await?;
1061
1062 let room = self.get_room(room_id, &tx).await?;
1063 Ok(room)
1064 })
1065 .await
1066 }
1067
1068 /// Removes the given follower connection as a follower of the given leader connection.
1069 pub async fn unfollow(
1070 &self,
1071 room_id: RoomId,
1072 project_id: ProjectId,
1073 leader_connection: ConnectionId,
1074 follower_connection: ConnectionId,
1075 ) -> Result<RoomGuard<proto::Room>> {
1076 self.room_transaction(room_id, |tx| async move {
1077 follower::Entity::delete_many()
1078 .filter(
1079 Condition::all()
1080 .add(follower::Column::RoomId.eq(room_id))
1081 .add(follower::Column::ProjectId.eq(project_id))
1082 .add(
1083 follower::Column::LeaderConnectionServerId
1084 .eq(leader_connection.owner_id),
1085 )
1086 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1087 .add(
1088 follower::Column::FollowerConnectionServerId
1089 .eq(follower_connection.owner_id),
1090 )
1091 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1092 )
1093 .exec(&*tx)
1094 .await?;
1095
1096 let room = self.get_room(room_id, &tx).await?;
1097 Ok(room)
1098 })
1099 .await
1100 }
1101}