1use super::*;
2
3impl Database {
4 /// Returns the count of all projects, excluding ones marked as admin.
5 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
6 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
7 enum QueryAs {
8 Count,
9 }
10
11 self.transaction(|tx| async move {
12 Ok(project::Entity::find()
13 .select_only()
14 .column_as(project::Column::Id.count(), QueryAs::Count)
15 .inner_join(user::Entity)
16 .filter(user::Column::Admin.eq(false))
17 .into_values::<_, QueryAs>()
18 .one(&*tx)
19 .await?
20 .unwrap_or(0i64) as usize)
21 })
22 .await
23 }
24
25 /// Shares a project with the given room.
26 pub async fn share_project(
27 &self,
28 room_id: RoomId,
29 connection: ConnectionId,
30 worktrees: &[proto::WorktreeMetadata],
31 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
32 self.room_transaction(room_id, |tx| async move {
33 let participant = room_participant::Entity::find()
34 .filter(
35 Condition::all()
36 .add(
37 room_participant::Column::AnsweringConnectionId
38 .eq(connection.id as i32),
39 )
40 .add(
41 room_participant::Column::AnsweringConnectionServerId
42 .eq(connection.owner_id as i32),
43 ),
44 )
45 .one(&*tx)
46 .await?
47 .ok_or_else(|| anyhow!("could not find participant"))?;
48 if participant.room_id != room_id {
49 return Err(anyhow!("shared project on unexpected room"))?;
50 }
51 if !participant
52 .role
53 .unwrap_or(ChannelRole::Member)
54 .can_publish_to_rooms()
55 {
56 return Err(anyhow!("guests cannot share projects"))?;
57 }
58
59 let project = project::ActiveModel {
60 room_id: ActiveValue::set(participant.room_id),
61 host_user_id: ActiveValue::set(participant.user_id),
62 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
63 host_connection_server_id: ActiveValue::set(Some(ServerId(
64 connection.owner_id as i32,
65 ))),
66 ..Default::default()
67 }
68 .insert(&*tx)
69 .await?;
70
71 if !worktrees.is_empty() {
72 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
73 worktree::ActiveModel {
74 id: ActiveValue::set(worktree.id as i64),
75 project_id: ActiveValue::set(project.id),
76 abs_path: ActiveValue::set(worktree.abs_path.clone()),
77 root_name: ActiveValue::set(worktree.root_name.clone()),
78 visible: ActiveValue::set(worktree.visible),
79 scan_id: ActiveValue::set(0),
80 completed_scan_id: ActiveValue::set(0),
81 }
82 }))
83 .exec(&*tx)
84 .await?;
85 }
86
87 project_collaborator::ActiveModel {
88 project_id: ActiveValue::set(project.id),
89 connection_id: ActiveValue::set(connection.id as i32),
90 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
91 user_id: ActiveValue::set(participant.user_id),
92 replica_id: ActiveValue::set(ReplicaId(0)),
93 is_host: ActiveValue::set(true),
94 ..Default::default()
95 }
96 .insert(&*tx)
97 .await?;
98
99 let room = self.get_room(room_id, &tx).await?;
100 Ok((project.id, room))
101 })
102 .await
103 }
104
105 /// Unshares the given project.
106 pub async fn unshare_project(
107 &self,
108 project_id: ProjectId,
109 connection: ConnectionId,
110 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
111 let room_id = self.room_id_for_project(project_id).await?;
112 self.room_transaction(room_id, |tx| async move {
113 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
114
115 let project = project::Entity::find_by_id(project_id)
116 .one(&*tx)
117 .await?
118 .ok_or_else(|| anyhow!("project not found"))?;
119 if project.host_connection()? == connection {
120 project::Entity::delete(project.into_active_model())
121 .exec(&*tx)
122 .await?;
123 let room = self.get_room(room_id, &tx).await?;
124 Ok((room, guest_connection_ids))
125 } else {
126 Err(anyhow!("cannot unshare a project hosted by another user"))?
127 }
128 })
129 .await
130 }
131
132 /// Updates the worktrees associated with the given project.
133 pub async fn update_project(
134 &self,
135 project_id: ProjectId,
136 connection: ConnectionId,
137 worktrees: &[proto::WorktreeMetadata],
138 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
139 let room_id = self.room_id_for_project(project_id).await?;
140 self.room_transaction(room_id, |tx| async move {
141 let project = project::Entity::find_by_id(project_id)
142 .filter(
143 Condition::all()
144 .add(project::Column::HostConnectionId.eq(connection.id as i32))
145 .add(
146 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
147 ),
148 )
149 .one(&*tx)
150 .await?
151 .ok_or_else(|| anyhow!("no such project"))?;
152
153 self.update_project_worktrees(project.id, worktrees, &tx)
154 .await?;
155
156 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
157 let room = self.get_room(project.room_id, &tx).await?;
158 Ok((room, guest_connection_ids))
159 })
160 .await
161 }
162
163 pub(in crate::db) async fn update_project_worktrees(
164 &self,
165 project_id: ProjectId,
166 worktrees: &[proto::WorktreeMetadata],
167 tx: &DatabaseTransaction,
168 ) -> Result<()> {
169 if !worktrees.is_empty() {
170 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
171 id: ActiveValue::set(worktree.id as i64),
172 project_id: ActiveValue::set(project_id),
173 abs_path: ActiveValue::set(worktree.abs_path.clone()),
174 root_name: ActiveValue::set(worktree.root_name.clone()),
175 visible: ActiveValue::set(worktree.visible),
176 scan_id: ActiveValue::set(0),
177 completed_scan_id: ActiveValue::set(0),
178 }))
179 .on_conflict(
180 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
181 .update_column(worktree::Column::RootName)
182 .to_owned(),
183 )
184 .exec(&*tx)
185 .await?;
186 }
187
188 worktree::Entity::delete_many()
189 .filter(worktree::Column::ProjectId.eq(project_id).and(
190 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
191 ))
192 .exec(&*tx)
193 .await?;
194
195 Ok(())
196 }
197
198 pub async fn update_worktree(
199 &self,
200 update: &proto::UpdateWorktree,
201 connection: ConnectionId,
202 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
203 let project_id = ProjectId::from_proto(update.project_id);
204 let worktree_id = update.worktree_id as i64;
205 let room_id = self.room_id_for_project(project_id).await?;
206 self.room_transaction(room_id, |tx| async move {
207 // Ensure the update comes from the host.
208 let _project = project::Entity::find_by_id(project_id)
209 .filter(
210 Condition::all()
211 .add(project::Column::HostConnectionId.eq(connection.id as i32))
212 .add(
213 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
214 ),
215 )
216 .one(&*tx)
217 .await?
218 .ok_or_else(|| anyhow!("no such project"))?;
219
220 // Update metadata.
221 worktree::Entity::update(worktree::ActiveModel {
222 id: ActiveValue::set(worktree_id),
223 project_id: ActiveValue::set(project_id),
224 root_name: ActiveValue::set(update.root_name.clone()),
225 scan_id: ActiveValue::set(update.scan_id as i64),
226 completed_scan_id: if update.is_last_update {
227 ActiveValue::set(update.scan_id as i64)
228 } else {
229 ActiveValue::default()
230 },
231 abs_path: ActiveValue::set(update.abs_path.clone()),
232 ..Default::default()
233 })
234 .exec(&*tx)
235 .await?;
236
237 if !update.updated_entries.is_empty() {
238 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
239 let mtime = entry.mtime.clone().unwrap_or_default();
240 worktree_entry::ActiveModel {
241 project_id: ActiveValue::set(project_id),
242 worktree_id: ActiveValue::set(worktree_id),
243 id: ActiveValue::set(entry.id as i64),
244 is_dir: ActiveValue::set(entry.is_dir),
245 path: ActiveValue::set(entry.path.clone()),
246 inode: ActiveValue::set(entry.inode as i64),
247 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
248 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
249 is_symlink: ActiveValue::set(entry.is_symlink),
250 is_ignored: ActiveValue::set(entry.is_ignored),
251 is_external: ActiveValue::set(entry.is_external),
252 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
253 is_deleted: ActiveValue::set(false),
254 scan_id: ActiveValue::set(update.scan_id as i64),
255 }
256 }))
257 .on_conflict(
258 OnConflict::columns([
259 worktree_entry::Column::ProjectId,
260 worktree_entry::Column::WorktreeId,
261 worktree_entry::Column::Id,
262 ])
263 .update_columns([
264 worktree_entry::Column::IsDir,
265 worktree_entry::Column::Path,
266 worktree_entry::Column::Inode,
267 worktree_entry::Column::MtimeSeconds,
268 worktree_entry::Column::MtimeNanos,
269 worktree_entry::Column::IsSymlink,
270 worktree_entry::Column::IsIgnored,
271 worktree_entry::Column::GitStatus,
272 worktree_entry::Column::ScanId,
273 ])
274 .to_owned(),
275 )
276 .exec(&*tx)
277 .await?;
278 }
279
280 if !update.removed_entries.is_empty() {
281 worktree_entry::Entity::update_many()
282 .filter(
283 worktree_entry::Column::ProjectId
284 .eq(project_id)
285 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
286 .and(
287 worktree_entry::Column::Id
288 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
289 ),
290 )
291 .set(worktree_entry::ActiveModel {
292 is_deleted: ActiveValue::Set(true),
293 scan_id: ActiveValue::Set(update.scan_id as i64),
294 ..Default::default()
295 })
296 .exec(&*tx)
297 .await?;
298 }
299
300 if !update.updated_repositories.is_empty() {
301 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
302 |repository| worktree_repository::ActiveModel {
303 project_id: ActiveValue::set(project_id),
304 worktree_id: ActiveValue::set(worktree_id),
305 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
306 scan_id: ActiveValue::set(update.scan_id as i64),
307 branch: ActiveValue::set(repository.branch.clone()),
308 is_deleted: ActiveValue::set(false),
309 },
310 ))
311 .on_conflict(
312 OnConflict::columns([
313 worktree_repository::Column::ProjectId,
314 worktree_repository::Column::WorktreeId,
315 worktree_repository::Column::WorkDirectoryId,
316 ])
317 .update_columns([
318 worktree_repository::Column::ScanId,
319 worktree_repository::Column::Branch,
320 ])
321 .to_owned(),
322 )
323 .exec(&*tx)
324 .await?;
325 }
326
327 if !update.removed_repositories.is_empty() {
328 worktree_repository::Entity::update_many()
329 .filter(
330 worktree_repository::Column::ProjectId
331 .eq(project_id)
332 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
333 .and(
334 worktree_repository::Column::WorkDirectoryId
335 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
336 ),
337 )
338 .set(worktree_repository::ActiveModel {
339 is_deleted: ActiveValue::Set(true),
340 scan_id: ActiveValue::Set(update.scan_id as i64),
341 ..Default::default()
342 })
343 .exec(&*tx)
344 .await?;
345 }
346
347 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
348 Ok(connection_ids)
349 })
350 .await
351 }
352
353 /// Updates the diagnostic summary for the given connection.
354 pub async fn update_diagnostic_summary(
355 &self,
356 update: &proto::UpdateDiagnosticSummary,
357 connection: ConnectionId,
358 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
359 let project_id = ProjectId::from_proto(update.project_id);
360 let worktree_id = update.worktree_id as i64;
361 let room_id = self.room_id_for_project(project_id).await?;
362 self.room_transaction(room_id, |tx| async move {
363 let summary = update
364 .summary
365 .as_ref()
366 .ok_or_else(|| anyhow!("invalid summary"))?;
367
368 // Ensure the update comes from the host.
369 let project = project::Entity::find_by_id(project_id)
370 .one(&*tx)
371 .await?
372 .ok_or_else(|| anyhow!("no such project"))?;
373 if project.host_connection()? != connection {
374 return Err(anyhow!("can't update a project hosted by someone else"))?;
375 }
376
377 // Update summary.
378 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
379 project_id: ActiveValue::set(project_id),
380 worktree_id: ActiveValue::set(worktree_id),
381 path: ActiveValue::set(summary.path.clone()),
382 language_server_id: ActiveValue::set(summary.language_server_id as i64),
383 error_count: ActiveValue::set(summary.error_count as i32),
384 warning_count: ActiveValue::set(summary.warning_count as i32),
385 ..Default::default()
386 })
387 .on_conflict(
388 OnConflict::columns([
389 worktree_diagnostic_summary::Column::ProjectId,
390 worktree_diagnostic_summary::Column::WorktreeId,
391 worktree_diagnostic_summary::Column::Path,
392 ])
393 .update_columns([
394 worktree_diagnostic_summary::Column::LanguageServerId,
395 worktree_diagnostic_summary::Column::ErrorCount,
396 worktree_diagnostic_summary::Column::WarningCount,
397 ])
398 .to_owned(),
399 )
400 .exec(&*tx)
401 .await?;
402
403 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
404 Ok(connection_ids)
405 })
406 .await
407 }
408
409 /// Starts the language server for the given connection.
410 pub async fn start_language_server(
411 &self,
412 update: &proto::StartLanguageServer,
413 connection: ConnectionId,
414 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
415 let project_id = ProjectId::from_proto(update.project_id);
416 let room_id = self.room_id_for_project(project_id).await?;
417 self.room_transaction(room_id, |tx| async move {
418 let server = update
419 .server
420 .as_ref()
421 .ok_or_else(|| anyhow!("invalid language server"))?;
422
423 // Ensure the update comes from the host.
424 let project = project::Entity::find_by_id(project_id)
425 .one(&*tx)
426 .await?
427 .ok_or_else(|| anyhow!("no such project"))?;
428 if project.host_connection()? != connection {
429 return Err(anyhow!("can't update a project hosted by someone else"))?;
430 }
431
432 // Add the newly-started language server.
433 language_server::Entity::insert(language_server::ActiveModel {
434 project_id: ActiveValue::set(project_id),
435 id: ActiveValue::set(server.id as i64),
436 name: ActiveValue::set(server.name.clone()),
437 ..Default::default()
438 })
439 .on_conflict(
440 OnConflict::columns([
441 language_server::Column::ProjectId,
442 language_server::Column::Id,
443 ])
444 .update_column(language_server::Column::Name)
445 .to_owned(),
446 )
447 .exec(&*tx)
448 .await?;
449
450 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
451 Ok(connection_ids)
452 })
453 .await
454 }
455
456 /// Updates the worktree settings for the given connection.
457 pub async fn update_worktree_settings(
458 &self,
459 update: &proto::UpdateWorktreeSettings,
460 connection: ConnectionId,
461 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
462 let project_id = ProjectId::from_proto(update.project_id);
463 let room_id = self.room_id_for_project(project_id).await?;
464 self.room_transaction(room_id, |tx| async move {
465 // Ensure the update comes from the host.
466 let project = project::Entity::find_by_id(project_id)
467 .one(&*tx)
468 .await?
469 .ok_or_else(|| anyhow!("no such project"))?;
470 if project.host_connection()? != connection {
471 return Err(anyhow!("can't update a project hosted by someone else"))?;
472 }
473
474 if let Some(content) = &update.content {
475 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
476 project_id: ActiveValue::Set(project_id),
477 worktree_id: ActiveValue::Set(update.worktree_id as i64),
478 path: ActiveValue::Set(update.path.clone()),
479 content: ActiveValue::Set(content.clone()),
480 })
481 .on_conflict(
482 OnConflict::columns([
483 worktree_settings_file::Column::ProjectId,
484 worktree_settings_file::Column::WorktreeId,
485 worktree_settings_file::Column::Path,
486 ])
487 .update_column(worktree_settings_file::Column::Content)
488 .to_owned(),
489 )
490 .exec(&*tx)
491 .await?;
492 } else {
493 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
494 project_id: ActiveValue::Set(project_id),
495 worktree_id: ActiveValue::Set(update.worktree_id as i64),
496 path: ActiveValue::Set(update.path.clone()),
497 ..Default::default()
498 })
499 .exec(&*tx)
500 .await?;
501 }
502
503 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
504 Ok(connection_ids)
505 })
506 .await
507 }
508
509 /// Adds the given connection to the specified project.
510 pub async fn join_project(
511 &self,
512 project_id: ProjectId,
513 connection: ConnectionId,
514 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
515 let room_id = self.room_id_for_project(project_id).await?;
516 self.room_transaction(room_id, |tx| async move {
517 let participant = room_participant::Entity::find()
518 .filter(
519 Condition::all()
520 .add(
521 room_participant::Column::AnsweringConnectionId
522 .eq(connection.id as i32),
523 )
524 .add(
525 room_participant::Column::AnsweringConnectionServerId
526 .eq(connection.owner_id as i32),
527 ),
528 )
529 .one(&*tx)
530 .await?
531 .ok_or_else(|| anyhow!("must join a room first"))?;
532
533 let project = project::Entity::find_by_id(project_id)
534 .one(&*tx)
535 .await?
536 .ok_or_else(|| anyhow!("no such project"))?;
537 if project.room_id != participant.room_id {
538 return Err(anyhow!("no such project"))?;
539 }
540
541 let mut collaborators = project
542 .find_related(project_collaborator::Entity)
543 .all(&*tx)
544 .await?;
545 let replica_ids = collaborators
546 .iter()
547 .map(|c| c.replica_id)
548 .collect::<HashSet<_>>();
549 let mut replica_id = ReplicaId(1);
550 while replica_ids.contains(&replica_id) {
551 replica_id.0 += 1;
552 }
553 let new_collaborator = project_collaborator::ActiveModel {
554 project_id: ActiveValue::set(project_id),
555 connection_id: ActiveValue::set(connection.id as i32),
556 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
557 user_id: ActiveValue::set(participant.user_id),
558 replica_id: ActiveValue::set(replica_id),
559 is_host: ActiveValue::set(false),
560 ..Default::default()
561 }
562 .insert(&*tx)
563 .await?;
564 collaborators.push(new_collaborator);
565
566 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
567 let mut worktrees = db_worktrees
568 .into_iter()
569 .map(|db_worktree| {
570 (
571 db_worktree.id as u64,
572 Worktree {
573 id: db_worktree.id as u64,
574 abs_path: db_worktree.abs_path,
575 root_name: db_worktree.root_name,
576 visible: db_worktree.visible,
577 entries: Default::default(),
578 repository_entries: Default::default(),
579 diagnostic_summaries: Default::default(),
580 settings_files: Default::default(),
581 scan_id: db_worktree.scan_id as u64,
582 completed_scan_id: db_worktree.completed_scan_id as u64,
583 },
584 )
585 })
586 .collect::<BTreeMap<_, _>>();
587
588 // Populate worktree entries.
589 {
590 let mut db_entries = worktree_entry::Entity::find()
591 .filter(
592 Condition::all()
593 .add(worktree_entry::Column::ProjectId.eq(project_id))
594 .add(worktree_entry::Column::IsDeleted.eq(false)),
595 )
596 .stream(&*tx)
597 .await?;
598 while let Some(db_entry) = db_entries.next().await {
599 let db_entry = db_entry?;
600 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
601 worktree.entries.push(proto::Entry {
602 id: db_entry.id as u64,
603 is_dir: db_entry.is_dir,
604 path: db_entry.path,
605 inode: db_entry.inode as u64,
606 mtime: Some(proto::Timestamp {
607 seconds: db_entry.mtime_seconds as u64,
608 nanos: db_entry.mtime_nanos as u32,
609 }),
610 is_symlink: db_entry.is_symlink,
611 is_ignored: db_entry.is_ignored,
612 is_external: db_entry.is_external,
613 git_status: db_entry.git_status.map(|status| status as i32),
614 });
615 }
616 }
617 }
618
619 // Populate repository entries.
620 {
621 let mut db_repository_entries = worktree_repository::Entity::find()
622 .filter(
623 Condition::all()
624 .add(worktree_repository::Column::ProjectId.eq(project_id))
625 .add(worktree_repository::Column::IsDeleted.eq(false)),
626 )
627 .stream(&*tx)
628 .await?;
629 while let Some(db_repository_entry) = db_repository_entries.next().await {
630 let db_repository_entry = db_repository_entry?;
631 if let Some(worktree) =
632 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
633 {
634 worktree.repository_entries.insert(
635 db_repository_entry.work_directory_id as u64,
636 proto::RepositoryEntry {
637 work_directory_id: db_repository_entry.work_directory_id as u64,
638 branch: db_repository_entry.branch,
639 },
640 );
641 }
642 }
643 }
644
645 // Populate worktree diagnostic summaries.
646 {
647 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
648 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
649 .stream(&*tx)
650 .await?;
651 while let Some(db_summary) = db_summaries.next().await {
652 let db_summary = db_summary?;
653 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
654 worktree
655 .diagnostic_summaries
656 .push(proto::DiagnosticSummary {
657 path: db_summary.path,
658 language_server_id: db_summary.language_server_id as u64,
659 error_count: db_summary.error_count as u32,
660 warning_count: db_summary.warning_count as u32,
661 });
662 }
663 }
664 }
665
666 // Populate worktree settings files
667 {
668 let mut db_settings_files = worktree_settings_file::Entity::find()
669 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
670 .stream(&*tx)
671 .await?;
672 while let Some(db_settings_file) = db_settings_files.next().await {
673 let db_settings_file = db_settings_file?;
674 if let Some(worktree) =
675 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
676 {
677 worktree.settings_files.push(WorktreeSettingsFile {
678 path: db_settings_file.path,
679 content: db_settings_file.content,
680 });
681 }
682 }
683 }
684
685 // Populate language servers.
686 let language_servers = project
687 .find_related(language_server::Entity)
688 .all(&*tx)
689 .await?;
690
691 let project = Project {
692 collaborators: collaborators
693 .into_iter()
694 .map(|collaborator| ProjectCollaborator {
695 connection_id: collaborator.connection(),
696 user_id: collaborator.user_id,
697 replica_id: collaborator.replica_id,
698 is_host: collaborator.is_host,
699 })
700 .collect(),
701 worktrees,
702 language_servers: language_servers
703 .into_iter()
704 .map(|language_server| proto::LanguageServer {
705 id: language_server.id as u64,
706 name: language_server.name,
707 })
708 .collect(),
709 };
710 Ok((project, replica_id as ReplicaId))
711 })
712 .await
713 }
714
715 /// Removes the given connection from the specified project.
716 pub async fn leave_project(
717 &self,
718 project_id: ProjectId,
719 connection: ConnectionId,
720 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
721 let room_id = self.room_id_for_project(project_id).await?;
722 self.room_transaction(room_id, |tx| async move {
723 let result = project_collaborator::Entity::delete_many()
724 .filter(
725 Condition::all()
726 .add(project_collaborator::Column::ProjectId.eq(project_id))
727 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
728 .add(
729 project_collaborator::Column::ConnectionServerId
730 .eq(connection.owner_id as i32),
731 ),
732 )
733 .exec(&*tx)
734 .await?;
735 if result.rows_affected == 0 {
736 Err(anyhow!("not a collaborator on this project"))?;
737 }
738
739 let project = project::Entity::find_by_id(project_id)
740 .one(&*tx)
741 .await?
742 .ok_or_else(|| anyhow!("no such project"))?;
743 let collaborators = project
744 .find_related(project_collaborator::Entity)
745 .all(&*tx)
746 .await?;
747 let connection_ids = collaborators
748 .into_iter()
749 .map(|collaborator| collaborator.connection())
750 .collect();
751
752 follower::Entity::delete_many()
753 .filter(
754 Condition::any()
755 .add(
756 Condition::all()
757 .add(follower::Column::ProjectId.eq(Some(project_id)))
758 .add(
759 follower::Column::LeaderConnectionServerId
760 .eq(connection.owner_id),
761 )
762 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
763 )
764 .add(
765 Condition::all()
766 .add(follower::Column::ProjectId.eq(Some(project_id)))
767 .add(
768 follower::Column::FollowerConnectionServerId
769 .eq(connection.owner_id),
770 )
771 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
772 ),
773 )
774 .exec(&*tx)
775 .await?;
776
777 let room = self.get_room(project.room_id, &tx).await?;
778 let left_project = LeftProject {
779 id: project_id,
780 host_user_id: project.host_user_id,
781 host_connection_id: Some(project.host_connection()?),
782 connection_ids,
783 };
784 Ok((room, left_project))
785 })
786 .await
787 }
788
789 pub async fn check_user_is_project_host(
790 &self,
791 project_id: ProjectId,
792 connection_id: ConnectionId,
793 ) -> Result<()> {
794 let room_id = self.room_id_for_project(project_id).await?;
795 self.room_transaction(room_id, |tx| async move {
796 project_collaborator::Entity::find()
797 .filter(
798 Condition::all()
799 .add(project_collaborator::Column::ProjectId.eq(project_id))
800 .add(project_collaborator::Column::IsHost.eq(true))
801 .add(project_collaborator::Column::ConnectionId.eq(connection_id.id))
802 .add(
803 project_collaborator::Column::ConnectionServerId
804 .eq(connection_id.owner_id),
805 ),
806 )
807 .one(&*tx)
808 .await?
809 .ok_or_else(|| anyhow!("failed to read project host"))?;
810
811 Ok(())
812 })
813 .await
814 .map(|guard| guard.into_inner())
815 }
816
817 /// Returns the host connection for a read-only request to join a shared project.
818 pub async fn host_for_read_only_project_request(
819 &self,
820 project_id: ProjectId,
821 connection_id: ConnectionId,
822 ) -> Result<ConnectionId> {
823 let room_id = self.room_id_for_project(project_id).await?;
824 self.room_transaction(room_id, |tx| async move {
825 let current_participant = room_participant::Entity::find()
826 .filter(room_participant::Column::RoomId.eq(room_id))
827 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
828 .one(&*tx)
829 .await?
830 .ok_or_else(|| anyhow!("no such room"))?;
831
832 if !current_participant
833 .role
834 .map_or(false, |role| role.can_read_projects())
835 {
836 Err(anyhow!("not authorized to read projects"))?;
837 }
838
839 let host = project_collaborator::Entity::find()
840 .filter(
841 project_collaborator::Column::ProjectId
842 .eq(project_id)
843 .and(project_collaborator::Column::IsHost.eq(true)),
844 )
845 .one(&*tx)
846 .await?
847 .ok_or_else(|| anyhow!("failed to read project host"))?;
848
849 Ok(host.connection())
850 })
851 .await
852 .map(|guard| guard.into_inner())
853 }
854
855 /// Returns the host connection for a request to join a shared project.
856 pub async fn host_for_mutating_project_request(
857 &self,
858 project_id: ProjectId,
859 connection_id: ConnectionId,
860 ) -> Result<ConnectionId> {
861 let room_id = self.room_id_for_project(project_id).await?;
862 self.room_transaction(room_id, |tx| async move {
863 let current_participant = room_participant::Entity::find()
864 .filter(room_participant::Column::RoomId.eq(room_id))
865 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
866 .one(&*tx)
867 .await?
868 .ok_or_else(|| anyhow!("no such room"))?;
869
870 if !current_participant
871 .role
872 .map_or(false, |role| role.can_edit_projects())
873 {
874 Err(anyhow!("not authorized to edit projects"))?;
875 }
876
877 let host = project_collaborator::Entity::find()
878 .filter(
879 project_collaborator::Column::ProjectId
880 .eq(project_id)
881 .and(project_collaborator::Column::IsHost.eq(true)),
882 )
883 .one(&*tx)
884 .await?
885 .ok_or_else(|| anyhow!("failed to read project host"))?;
886
887 Ok(host.connection())
888 })
889 .await
890 .map(|guard| guard.into_inner())
891 }
892
893 pub async fn project_collaborators_for_buffer_update(
894 &self,
895 project_id: ProjectId,
896 connection_id: ConnectionId,
897 requires_write: bool,
898 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
899 let room_id = self.room_id_for_project(project_id).await?;
900 self.room_transaction(room_id, |tx| async move {
901 let current_participant = room_participant::Entity::find()
902 .filter(room_participant::Column::RoomId.eq(room_id))
903 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
904 .one(&*tx)
905 .await?
906 .ok_or_else(|| anyhow!("no such room"))?;
907
908 if requires_write
909 && !current_participant
910 .role
911 .map_or(false, |role| role.can_edit_projects())
912 {
913 Err(anyhow!("not authorized to edit projects"))?;
914 }
915
916 let collaborators = project_collaborator::Entity::find()
917 .filter(project_collaborator::Column::ProjectId.eq(project_id))
918 .all(&*tx)
919 .await?
920 .into_iter()
921 .map(|collaborator| ProjectCollaborator {
922 connection_id: collaborator.connection(),
923 user_id: collaborator.user_id,
924 replica_id: collaborator.replica_id,
925 is_host: collaborator.is_host,
926 })
927 .collect::<Vec<_>>();
928
929 if collaborators
930 .iter()
931 .any(|collaborator| collaborator.connection_id == connection_id)
932 {
933 Ok(collaborators)
934 } else {
935 Err(anyhow!("no such project"))?
936 }
937 })
938 .await
939 }
940
941 /// Returns the connection IDs in the given project.
942 ///
943 /// The provided `connection_id` must also be a collaborator in the project,
944 /// otherwise an error will be returned.
945 pub async fn project_connection_ids(
946 &self,
947 project_id: ProjectId,
948 connection_id: ConnectionId,
949 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
950 let room_id = self.room_id_for_project(project_id).await?;
951 self.room_transaction(room_id, |tx| async move {
952 let mut collaborators = project_collaborator::Entity::find()
953 .filter(project_collaborator::Column::ProjectId.eq(project_id))
954 .stream(&*tx)
955 .await?;
956
957 let mut connection_ids = HashSet::default();
958 while let Some(collaborator) = collaborators.next().await {
959 let collaborator = collaborator?;
960 connection_ids.insert(collaborator.connection());
961 }
962
963 if connection_ids.contains(&connection_id) {
964 Ok(connection_ids)
965 } else {
966 Err(anyhow!("no such project"))?
967 }
968 })
969 .await
970 }
971
972 async fn project_guest_connection_ids(
973 &self,
974 project_id: ProjectId,
975 tx: &DatabaseTransaction,
976 ) -> Result<Vec<ConnectionId>> {
977 let mut collaborators = project_collaborator::Entity::find()
978 .filter(
979 project_collaborator::Column::ProjectId
980 .eq(project_id)
981 .and(project_collaborator::Column::IsHost.eq(false)),
982 )
983 .stream(tx)
984 .await?;
985
986 let mut guest_connection_ids = Vec::new();
987 while let Some(collaborator) = collaborators.next().await {
988 let collaborator = collaborator?;
989 guest_connection_ids.push(collaborator.connection());
990 }
991 Ok(guest_connection_ids)
992 }
993
994 /// Returns the [`RoomId`] for the given project.
995 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
996 self.transaction(|tx| async move {
997 let project = project::Entity::find_by_id(project_id)
998 .one(&*tx)
999 .await?
1000 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
1001 Ok(project.room_id)
1002 })
1003 .await
1004 }
1005
1006 pub async fn check_room_participants(
1007 &self,
1008 room_id: RoomId,
1009 leader_id: ConnectionId,
1010 follower_id: ConnectionId,
1011 ) -> Result<()> {
1012 self.transaction(|tx| async move {
1013 use room_participant::Column;
1014
1015 let count = room_participant::Entity::find()
1016 .filter(
1017 Condition::all().add(Column::RoomId.eq(room_id)).add(
1018 Condition::any()
1019 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1020 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1021 ))
1022 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1023 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1024 )),
1025 ),
1026 )
1027 .count(&*tx)
1028 .await?;
1029
1030 if count < 2 {
1031 Err(anyhow!("not room participants"))?;
1032 }
1033
1034 Ok(())
1035 })
1036 .await
1037 }
1038
1039 /// Adds the given follower connection as a follower of the given leader connection.
1040 pub async fn follow(
1041 &self,
1042 room_id: RoomId,
1043 project_id: ProjectId,
1044 leader_connection: ConnectionId,
1045 follower_connection: ConnectionId,
1046 ) -> Result<RoomGuard<proto::Room>> {
1047 self.room_transaction(room_id, |tx| async move {
1048 follower::ActiveModel {
1049 room_id: ActiveValue::set(room_id),
1050 project_id: ActiveValue::set(project_id),
1051 leader_connection_server_id: ActiveValue::set(ServerId(
1052 leader_connection.owner_id as i32,
1053 )),
1054 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1055 follower_connection_server_id: ActiveValue::set(ServerId(
1056 follower_connection.owner_id as i32,
1057 )),
1058 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1059 ..Default::default()
1060 }
1061 .insert(&*tx)
1062 .await?;
1063
1064 let room = self.get_room(room_id, &*tx).await?;
1065 Ok(room)
1066 })
1067 .await
1068 }
1069
1070 /// Removes the given follower connection as a follower of the given leader connection.
1071 pub async fn unfollow(
1072 &self,
1073 room_id: RoomId,
1074 project_id: ProjectId,
1075 leader_connection: ConnectionId,
1076 follower_connection: ConnectionId,
1077 ) -> Result<RoomGuard<proto::Room>> {
1078 self.room_transaction(room_id, |tx| async move {
1079 follower::Entity::delete_many()
1080 .filter(
1081 Condition::all()
1082 .add(follower::Column::RoomId.eq(room_id))
1083 .add(follower::Column::ProjectId.eq(project_id))
1084 .add(
1085 follower::Column::LeaderConnectionServerId
1086 .eq(leader_connection.owner_id),
1087 )
1088 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1089 .add(
1090 follower::Column::FollowerConnectionServerId
1091 .eq(follower_connection.owner_id),
1092 )
1093 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1094 )
1095 .exec(&*tx)
1096 .await?;
1097
1098 let room = self.get_room(room_id, &*tx).await?;
1099 Ok(room)
1100 })
1101 .await
1102 }
1103}