1use super::*;
2
3impl Database {
4 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
5 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
6 enum QueryAs {
7 Count,
8 }
9
10 self.transaction(|tx| async move {
11 Ok(project::Entity::find()
12 .select_only()
13 .column_as(project::Column::Id.count(), QueryAs::Count)
14 .inner_join(user::Entity)
15 .filter(user::Column::Admin.eq(false))
16 .into_values::<_, QueryAs>()
17 .one(&*tx)
18 .await?
19 .unwrap_or(0i64) as usize)
20 })
21 .await
22 }
23
24 pub async fn share_project(
25 &self,
26 room_id: RoomId,
27 connection: ConnectionId,
28 worktrees: &[proto::WorktreeMetadata],
29 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
30 self.room_transaction(room_id, |tx| async move {
31 let participant = room_participant::Entity::find()
32 .filter(
33 Condition::all()
34 .add(
35 room_participant::Column::AnsweringConnectionId
36 .eq(connection.id as i32),
37 )
38 .add(
39 room_participant::Column::AnsweringConnectionServerId
40 .eq(connection.owner_id as i32),
41 ),
42 )
43 .one(&*tx)
44 .await?
45 .ok_or_else(|| anyhow!("could not find participant"))?;
46 if participant.room_id != room_id {
47 return Err(anyhow!("shared project on unexpected room"))?;
48 }
49 if !participant
50 .role
51 .unwrap_or(ChannelRole::Member)
52 .can_share_projects()
53 {
54 return Err(anyhow!("guests cannot share projects"))?;
55 }
56
57 let project = project::ActiveModel {
58 room_id: ActiveValue::set(participant.room_id),
59 host_user_id: ActiveValue::set(participant.user_id),
60 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
61 host_connection_server_id: ActiveValue::set(Some(ServerId(
62 connection.owner_id as i32,
63 ))),
64 ..Default::default()
65 }
66 .insert(&*tx)
67 .await?;
68
69 if !worktrees.is_empty() {
70 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
71 worktree::ActiveModel {
72 id: ActiveValue::set(worktree.id as i64),
73 project_id: ActiveValue::set(project.id),
74 abs_path: ActiveValue::set(worktree.abs_path.clone()),
75 root_name: ActiveValue::set(worktree.root_name.clone()),
76 visible: ActiveValue::set(worktree.visible),
77 scan_id: ActiveValue::set(0),
78 completed_scan_id: ActiveValue::set(0),
79 }
80 }))
81 .exec(&*tx)
82 .await?;
83 }
84
85 project_collaborator::ActiveModel {
86 project_id: ActiveValue::set(project.id),
87 connection_id: ActiveValue::set(connection.id as i32),
88 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
89 user_id: ActiveValue::set(participant.user_id),
90 replica_id: ActiveValue::set(ReplicaId(0)),
91 is_host: ActiveValue::set(true),
92 ..Default::default()
93 }
94 .insert(&*tx)
95 .await?;
96
97 let room = self.get_room(room_id, &tx).await?;
98 Ok((project.id, room))
99 })
100 .await
101 }
102
103 pub async fn unshare_project(
104 &self,
105 project_id: ProjectId,
106 connection: ConnectionId,
107 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
108 let room_id = self.room_id_for_project(project_id).await?;
109 self.room_transaction(room_id, |tx| async move {
110 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
111
112 let project = project::Entity::find_by_id(project_id)
113 .one(&*tx)
114 .await?
115 .ok_or_else(|| anyhow!("project not found"))?;
116 if project.host_connection()? == connection {
117 project::Entity::delete(project.into_active_model())
118 .exec(&*tx)
119 .await?;
120 let room = self.get_room(room_id, &tx).await?;
121 Ok((room, guest_connection_ids))
122 } else {
123 Err(anyhow!("cannot unshare a project hosted by another user"))?
124 }
125 })
126 .await
127 }
128
129 pub async fn update_project(
130 &self,
131 project_id: ProjectId,
132 connection: ConnectionId,
133 worktrees: &[proto::WorktreeMetadata],
134 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
135 let room_id = self.room_id_for_project(project_id).await?;
136 self.room_transaction(room_id, |tx| async move {
137 let project = project::Entity::find_by_id(project_id)
138 .filter(
139 Condition::all()
140 .add(project::Column::HostConnectionId.eq(connection.id as i32))
141 .add(
142 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
143 ),
144 )
145 .one(&*tx)
146 .await?
147 .ok_or_else(|| anyhow!("no such project"))?;
148
149 self.update_project_worktrees(project.id, worktrees, &tx)
150 .await?;
151
152 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
153 let room = self.get_room(project.room_id, &tx).await?;
154 Ok((room, guest_connection_ids))
155 })
156 .await
157 }
158
159 pub(in crate::db) async fn update_project_worktrees(
160 &self,
161 project_id: ProjectId,
162 worktrees: &[proto::WorktreeMetadata],
163 tx: &DatabaseTransaction,
164 ) -> Result<()> {
165 if !worktrees.is_empty() {
166 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
167 id: ActiveValue::set(worktree.id as i64),
168 project_id: ActiveValue::set(project_id),
169 abs_path: ActiveValue::set(worktree.abs_path.clone()),
170 root_name: ActiveValue::set(worktree.root_name.clone()),
171 visible: ActiveValue::set(worktree.visible),
172 scan_id: ActiveValue::set(0),
173 completed_scan_id: ActiveValue::set(0),
174 }))
175 .on_conflict(
176 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
177 .update_column(worktree::Column::RootName)
178 .to_owned(),
179 )
180 .exec(&*tx)
181 .await?;
182 }
183
184 worktree::Entity::delete_many()
185 .filter(worktree::Column::ProjectId.eq(project_id).and(
186 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
187 ))
188 .exec(&*tx)
189 .await?;
190
191 Ok(())
192 }
193
194 pub async fn update_worktree(
195 &self,
196 update: &proto::UpdateWorktree,
197 connection: ConnectionId,
198 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
199 let project_id = ProjectId::from_proto(update.project_id);
200 let worktree_id = update.worktree_id as i64;
201 let room_id = self.room_id_for_project(project_id).await?;
202 self.room_transaction(room_id, |tx| async move {
203 // Ensure the update comes from the host.
204 let _project = project::Entity::find_by_id(project_id)
205 .filter(
206 Condition::all()
207 .add(project::Column::HostConnectionId.eq(connection.id as i32))
208 .add(
209 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
210 ),
211 )
212 .one(&*tx)
213 .await?
214 .ok_or_else(|| anyhow!("no such project"))?;
215
216 // Update metadata.
217 worktree::Entity::update(worktree::ActiveModel {
218 id: ActiveValue::set(worktree_id),
219 project_id: ActiveValue::set(project_id),
220 root_name: ActiveValue::set(update.root_name.clone()),
221 scan_id: ActiveValue::set(update.scan_id as i64),
222 completed_scan_id: if update.is_last_update {
223 ActiveValue::set(update.scan_id as i64)
224 } else {
225 ActiveValue::default()
226 },
227 abs_path: ActiveValue::set(update.abs_path.clone()),
228 ..Default::default()
229 })
230 .exec(&*tx)
231 .await?;
232
233 if !update.updated_entries.is_empty() {
234 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
235 let mtime = entry.mtime.clone().unwrap_or_default();
236 worktree_entry::ActiveModel {
237 project_id: ActiveValue::set(project_id),
238 worktree_id: ActiveValue::set(worktree_id),
239 id: ActiveValue::set(entry.id as i64),
240 is_dir: ActiveValue::set(entry.is_dir),
241 path: ActiveValue::set(entry.path.clone()),
242 inode: ActiveValue::set(entry.inode as i64),
243 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
244 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
245 is_symlink: ActiveValue::set(entry.is_symlink),
246 is_ignored: ActiveValue::set(entry.is_ignored),
247 is_external: ActiveValue::set(entry.is_external),
248 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
249 is_deleted: ActiveValue::set(false),
250 scan_id: ActiveValue::set(update.scan_id as i64),
251 }
252 }))
253 .on_conflict(
254 OnConflict::columns([
255 worktree_entry::Column::ProjectId,
256 worktree_entry::Column::WorktreeId,
257 worktree_entry::Column::Id,
258 ])
259 .update_columns([
260 worktree_entry::Column::IsDir,
261 worktree_entry::Column::Path,
262 worktree_entry::Column::Inode,
263 worktree_entry::Column::MtimeSeconds,
264 worktree_entry::Column::MtimeNanos,
265 worktree_entry::Column::IsSymlink,
266 worktree_entry::Column::IsIgnored,
267 worktree_entry::Column::GitStatus,
268 worktree_entry::Column::ScanId,
269 ])
270 .to_owned(),
271 )
272 .exec(&*tx)
273 .await?;
274 }
275
276 if !update.removed_entries.is_empty() {
277 worktree_entry::Entity::update_many()
278 .filter(
279 worktree_entry::Column::ProjectId
280 .eq(project_id)
281 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
282 .and(
283 worktree_entry::Column::Id
284 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
285 ),
286 )
287 .set(worktree_entry::ActiveModel {
288 is_deleted: ActiveValue::Set(true),
289 scan_id: ActiveValue::Set(update.scan_id as i64),
290 ..Default::default()
291 })
292 .exec(&*tx)
293 .await?;
294 }
295
296 if !update.updated_repositories.is_empty() {
297 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
298 |repository| worktree_repository::ActiveModel {
299 project_id: ActiveValue::set(project_id),
300 worktree_id: ActiveValue::set(worktree_id),
301 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
302 scan_id: ActiveValue::set(update.scan_id as i64),
303 branch: ActiveValue::set(repository.branch.clone()),
304 is_deleted: ActiveValue::set(false),
305 },
306 ))
307 .on_conflict(
308 OnConflict::columns([
309 worktree_repository::Column::ProjectId,
310 worktree_repository::Column::WorktreeId,
311 worktree_repository::Column::WorkDirectoryId,
312 ])
313 .update_columns([
314 worktree_repository::Column::ScanId,
315 worktree_repository::Column::Branch,
316 ])
317 .to_owned(),
318 )
319 .exec(&*tx)
320 .await?;
321 }
322
323 if !update.removed_repositories.is_empty() {
324 worktree_repository::Entity::update_many()
325 .filter(
326 worktree_repository::Column::ProjectId
327 .eq(project_id)
328 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
329 .and(
330 worktree_repository::Column::WorkDirectoryId
331 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
332 ),
333 )
334 .set(worktree_repository::ActiveModel {
335 is_deleted: ActiveValue::Set(true),
336 scan_id: ActiveValue::Set(update.scan_id as i64),
337 ..Default::default()
338 })
339 .exec(&*tx)
340 .await?;
341 }
342
343 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
344 Ok(connection_ids)
345 })
346 .await
347 }
348
349 pub async fn update_diagnostic_summary(
350 &self,
351 update: &proto::UpdateDiagnosticSummary,
352 connection: ConnectionId,
353 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
354 let project_id = ProjectId::from_proto(update.project_id);
355 let worktree_id = update.worktree_id as i64;
356 let room_id = self.room_id_for_project(project_id).await?;
357 self.room_transaction(room_id, |tx| async move {
358 let summary = update
359 .summary
360 .as_ref()
361 .ok_or_else(|| anyhow!("invalid summary"))?;
362
363 // Ensure the update comes from the host.
364 let project = project::Entity::find_by_id(project_id)
365 .one(&*tx)
366 .await?
367 .ok_or_else(|| anyhow!("no such project"))?;
368 if project.host_connection()? != connection {
369 return Err(anyhow!("can't update a project hosted by someone else"))?;
370 }
371
372 // Update summary.
373 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
374 project_id: ActiveValue::set(project_id),
375 worktree_id: ActiveValue::set(worktree_id),
376 path: ActiveValue::set(summary.path.clone()),
377 language_server_id: ActiveValue::set(summary.language_server_id as i64),
378 error_count: ActiveValue::set(summary.error_count as i32),
379 warning_count: ActiveValue::set(summary.warning_count as i32),
380 ..Default::default()
381 })
382 .on_conflict(
383 OnConflict::columns([
384 worktree_diagnostic_summary::Column::ProjectId,
385 worktree_diagnostic_summary::Column::WorktreeId,
386 worktree_diagnostic_summary::Column::Path,
387 ])
388 .update_columns([
389 worktree_diagnostic_summary::Column::LanguageServerId,
390 worktree_diagnostic_summary::Column::ErrorCount,
391 worktree_diagnostic_summary::Column::WarningCount,
392 ])
393 .to_owned(),
394 )
395 .exec(&*tx)
396 .await?;
397
398 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
399 Ok(connection_ids)
400 })
401 .await
402 }
403
404 pub async fn start_language_server(
405 &self,
406 update: &proto::StartLanguageServer,
407 connection: ConnectionId,
408 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
409 let project_id = ProjectId::from_proto(update.project_id);
410 let room_id = self.room_id_for_project(project_id).await?;
411 self.room_transaction(room_id, |tx| async move {
412 let server = update
413 .server
414 .as_ref()
415 .ok_or_else(|| anyhow!("invalid language server"))?;
416
417 // Ensure the update comes from the host.
418 let project = project::Entity::find_by_id(project_id)
419 .one(&*tx)
420 .await?
421 .ok_or_else(|| anyhow!("no such project"))?;
422 if project.host_connection()? != connection {
423 return Err(anyhow!("can't update a project hosted by someone else"))?;
424 }
425
426 // Add the newly-started language server.
427 language_server::Entity::insert(language_server::ActiveModel {
428 project_id: ActiveValue::set(project_id),
429 id: ActiveValue::set(server.id as i64),
430 name: ActiveValue::set(server.name.clone()),
431 ..Default::default()
432 })
433 .on_conflict(
434 OnConflict::columns([
435 language_server::Column::ProjectId,
436 language_server::Column::Id,
437 ])
438 .update_column(language_server::Column::Name)
439 .to_owned(),
440 )
441 .exec(&*tx)
442 .await?;
443
444 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
445 Ok(connection_ids)
446 })
447 .await
448 }
449
450 pub async fn update_worktree_settings(
451 &self,
452 update: &proto::UpdateWorktreeSettings,
453 connection: ConnectionId,
454 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
455 let project_id = ProjectId::from_proto(update.project_id);
456 let room_id = self.room_id_for_project(project_id).await?;
457 self.room_transaction(room_id, |tx| async move {
458 // Ensure the update comes from the host.
459 let project = project::Entity::find_by_id(project_id)
460 .one(&*tx)
461 .await?
462 .ok_or_else(|| anyhow!("no such project"))?;
463 if project.host_connection()? != connection {
464 return Err(anyhow!("can't update a project hosted by someone else"))?;
465 }
466
467 if let Some(content) = &update.content {
468 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
469 project_id: ActiveValue::Set(project_id),
470 worktree_id: ActiveValue::Set(update.worktree_id as i64),
471 path: ActiveValue::Set(update.path.clone()),
472 content: ActiveValue::Set(content.clone()),
473 })
474 .on_conflict(
475 OnConflict::columns([
476 worktree_settings_file::Column::ProjectId,
477 worktree_settings_file::Column::WorktreeId,
478 worktree_settings_file::Column::Path,
479 ])
480 .update_column(worktree_settings_file::Column::Content)
481 .to_owned(),
482 )
483 .exec(&*tx)
484 .await?;
485 } else {
486 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
487 project_id: ActiveValue::Set(project_id),
488 worktree_id: ActiveValue::Set(update.worktree_id as i64),
489 path: ActiveValue::Set(update.path.clone()),
490 ..Default::default()
491 })
492 .exec(&*tx)
493 .await?;
494 }
495
496 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
497 Ok(connection_ids)
498 })
499 .await
500 }
501
502 pub async fn join_project(
503 &self,
504 project_id: ProjectId,
505 connection: ConnectionId,
506 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
507 let room_id = self.room_id_for_project(project_id).await?;
508 self.room_transaction(room_id, |tx| async move {
509 let participant = room_participant::Entity::find()
510 .filter(
511 Condition::all()
512 .add(
513 room_participant::Column::AnsweringConnectionId
514 .eq(connection.id as i32),
515 )
516 .add(
517 room_participant::Column::AnsweringConnectionServerId
518 .eq(connection.owner_id as i32),
519 ),
520 )
521 .one(&*tx)
522 .await?
523 .ok_or_else(|| anyhow!("must join a room first"))?;
524
525 let project = project::Entity::find_by_id(project_id)
526 .one(&*tx)
527 .await?
528 .ok_or_else(|| anyhow!("no such project"))?;
529 if project.room_id != participant.room_id {
530 return Err(anyhow!("no such project"))?;
531 }
532
533 let mut collaborators = project
534 .find_related(project_collaborator::Entity)
535 .all(&*tx)
536 .await?;
537 let replica_ids = collaborators
538 .iter()
539 .map(|c| c.replica_id)
540 .collect::<HashSet<_>>();
541 let mut replica_id = ReplicaId(1);
542 while replica_ids.contains(&replica_id) {
543 replica_id.0 += 1;
544 }
545 let new_collaborator = project_collaborator::ActiveModel {
546 project_id: ActiveValue::set(project_id),
547 connection_id: ActiveValue::set(connection.id as i32),
548 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
549 user_id: ActiveValue::set(participant.user_id),
550 replica_id: ActiveValue::set(replica_id),
551 is_host: ActiveValue::set(false),
552 ..Default::default()
553 }
554 .insert(&*tx)
555 .await?;
556 collaborators.push(new_collaborator);
557
558 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
559 let mut worktrees = db_worktrees
560 .into_iter()
561 .map(|db_worktree| {
562 (
563 db_worktree.id as u64,
564 Worktree {
565 id: db_worktree.id as u64,
566 abs_path: db_worktree.abs_path,
567 root_name: db_worktree.root_name,
568 visible: db_worktree.visible,
569 entries: Default::default(),
570 repository_entries: Default::default(),
571 diagnostic_summaries: Default::default(),
572 settings_files: Default::default(),
573 scan_id: db_worktree.scan_id as u64,
574 completed_scan_id: db_worktree.completed_scan_id as u64,
575 },
576 )
577 })
578 .collect::<BTreeMap<_, _>>();
579
580 // Populate worktree entries.
581 {
582 let mut db_entries = worktree_entry::Entity::find()
583 .filter(
584 Condition::all()
585 .add(worktree_entry::Column::ProjectId.eq(project_id))
586 .add(worktree_entry::Column::IsDeleted.eq(false)),
587 )
588 .stream(&*tx)
589 .await?;
590 while let Some(db_entry) = db_entries.next().await {
591 let db_entry = db_entry?;
592 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
593 worktree.entries.push(proto::Entry {
594 id: db_entry.id as u64,
595 is_dir: db_entry.is_dir,
596 path: db_entry.path,
597 inode: db_entry.inode as u64,
598 mtime: Some(proto::Timestamp {
599 seconds: db_entry.mtime_seconds as u64,
600 nanos: db_entry.mtime_nanos as u32,
601 }),
602 is_symlink: db_entry.is_symlink,
603 is_ignored: db_entry.is_ignored,
604 is_external: db_entry.is_external,
605 git_status: db_entry.git_status.map(|status| status as i32),
606 });
607 }
608 }
609 }
610
611 // Populate repository entries.
612 {
613 let mut db_repository_entries = worktree_repository::Entity::find()
614 .filter(
615 Condition::all()
616 .add(worktree_repository::Column::ProjectId.eq(project_id))
617 .add(worktree_repository::Column::IsDeleted.eq(false)),
618 )
619 .stream(&*tx)
620 .await?;
621 while let Some(db_repository_entry) = db_repository_entries.next().await {
622 let db_repository_entry = db_repository_entry?;
623 if let Some(worktree) =
624 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
625 {
626 worktree.repository_entries.insert(
627 db_repository_entry.work_directory_id as u64,
628 proto::RepositoryEntry {
629 work_directory_id: db_repository_entry.work_directory_id as u64,
630 branch: db_repository_entry.branch,
631 },
632 );
633 }
634 }
635 }
636
637 // Populate worktree diagnostic summaries.
638 {
639 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
640 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
641 .stream(&*tx)
642 .await?;
643 while let Some(db_summary) = db_summaries.next().await {
644 let db_summary = db_summary?;
645 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
646 worktree
647 .diagnostic_summaries
648 .push(proto::DiagnosticSummary {
649 path: db_summary.path,
650 language_server_id: db_summary.language_server_id as u64,
651 error_count: db_summary.error_count as u32,
652 warning_count: db_summary.warning_count as u32,
653 });
654 }
655 }
656 }
657
658 // Populate worktree settings files
659 {
660 let mut db_settings_files = worktree_settings_file::Entity::find()
661 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
662 .stream(&*tx)
663 .await?;
664 while let Some(db_settings_file) = db_settings_files.next().await {
665 let db_settings_file = db_settings_file?;
666 if let Some(worktree) =
667 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
668 {
669 worktree.settings_files.push(WorktreeSettingsFile {
670 path: db_settings_file.path,
671 content: db_settings_file.content,
672 });
673 }
674 }
675 }
676
677 // Populate language servers.
678 let language_servers = project
679 .find_related(language_server::Entity)
680 .all(&*tx)
681 .await?;
682
683 let project = Project {
684 collaborators: collaborators
685 .into_iter()
686 .map(|collaborator| ProjectCollaborator {
687 connection_id: collaborator.connection(),
688 user_id: collaborator.user_id,
689 replica_id: collaborator.replica_id,
690 is_host: collaborator.is_host,
691 })
692 .collect(),
693 worktrees,
694 language_servers: language_servers
695 .into_iter()
696 .map(|language_server| proto::LanguageServer {
697 id: language_server.id as u64,
698 name: language_server.name,
699 })
700 .collect(),
701 };
702 Ok((project, replica_id as ReplicaId))
703 })
704 .await
705 }
706
707 pub async fn leave_project(
708 &self,
709 project_id: ProjectId,
710 connection: ConnectionId,
711 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
712 let room_id = self.room_id_for_project(project_id).await?;
713 self.room_transaction(room_id, |tx| async move {
714 let result = project_collaborator::Entity::delete_many()
715 .filter(
716 Condition::all()
717 .add(project_collaborator::Column::ProjectId.eq(project_id))
718 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
719 .add(
720 project_collaborator::Column::ConnectionServerId
721 .eq(connection.owner_id as i32),
722 ),
723 )
724 .exec(&*tx)
725 .await?;
726 if result.rows_affected == 0 {
727 Err(anyhow!("not a collaborator on this project"))?;
728 }
729
730 let project = project::Entity::find_by_id(project_id)
731 .one(&*tx)
732 .await?
733 .ok_or_else(|| anyhow!("no such project"))?;
734 let collaborators = project
735 .find_related(project_collaborator::Entity)
736 .all(&*tx)
737 .await?;
738 let connection_ids = collaborators
739 .into_iter()
740 .map(|collaborator| collaborator.connection())
741 .collect();
742
743 follower::Entity::delete_many()
744 .filter(
745 Condition::any()
746 .add(
747 Condition::all()
748 .add(follower::Column::ProjectId.eq(Some(project_id)))
749 .add(
750 follower::Column::LeaderConnectionServerId
751 .eq(connection.owner_id),
752 )
753 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
754 )
755 .add(
756 Condition::all()
757 .add(follower::Column::ProjectId.eq(Some(project_id)))
758 .add(
759 follower::Column::FollowerConnectionServerId
760 .eq(connection.owner_id),
761 )
762 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
763 ),
764 )
765 .exec(&*tx)
766 .await?;
767
768 let room = self.get_room(project.room_id, &tx).await?;
769 let left_project = LeftProject {
770 id: project_id,
771 host_user_id: project.host_user_id,
772 host_connection_id: project.host_connection()?,
773 connection_ids,
774 };
775 Ok((room, left_project))
776 })
777 .await
778 }
779
780 pub async fn project_collaborators(
781 &self,
782 project_id: ProjectId,
783 connection_id: ConnectionId,
784 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
785 let room_id = self.room_id_for_project(project_id).await?;
786 self.room_transaction(room_id, |tx| async move {
787 let collaborators = project_collaborator::Entity::find()
788 .filter(project_collaborator::Column::ProjectId.eq(project_id))
789 .all(&*tx)
790 .await?
791 .into_iter()
792 .map(|collaborator| ProjectCollaborator {
793 connection_id: collaborator.connection(),
794 user_id: collaborator.user_id,
795 replica_id: collaborator.replica_id,
796 is_host: collaborator.is_host,
797 })
798 .collect::<Vec<_>>();
799
800 if collaborators
801 .iter()
802 .any(|collaborator| collaborator.connection_id == connection_id)
803 {
804 Ok(collaborators)
805 } else {
806 Err(anyhow!("no such project"))?
807 }
808 })
809 .await
810 }
811
812 pub async fn project_connection_ids(
813 &self,
814 project_id: ProjectId,
815 connection_id: ConnectionId,
816 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
817 let room_id = self.room_id_for_project(project_id).await?;
818 self.room_transaction(room_id, |tx| async move {
819 let mut collaborators = project_collaborator::Entity::find()
820 .filter(project_collaborator::Column::ProjectId.eq(project_id))
821 .stream(&*tx)
822 .await?;
823
824 let mut connection_ids = HashSet::default();
825 while let Some(collaborator) = collaborators.next().await {
826 let collaborator = collaborator?;
827 connection_ids.insert(collaborator.connection());
828 }
829
830 if connection_ids.contains(&connection_id) {
831 Ok(connection_ids)
832 } else {
833 Err(anyhow!("no such project"))?
834 }
835 })
836 .await
837 }
838
839 async fn project_guest_connection_ids(
840 &self,
841 project_id: ProjectId,
842 tx: &DatabaseTransaction,
843 ) -> Result<Vec<ConnectionId>> {
844 let mut collaborators = project_collaborator::Entity::find()
845 .filter(
846 project_collaborator::Column::ProjectId
847 .eq(project_id)
848 .and(project_collaborator::Column::IsHost.eq(false)),
849 )
850 .stream(tx)
851 .await?;
852
853 let mut guest_connection_ids = Vec::new();
854 while let Some(collaborator) = collaborators.next().await {
855 let collaborator = collaborator?;
856 guest_connection_ids.push(collaborator.connection());
857 }
858 Ok(guest_connection_ids)
859 }
860
861 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
862 self.transaction(|tx| async move {
863 let project = project::Entity::find_by_id(project_id)
864 .one(&*tx)
865 .await?
866 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
867 Ok(project.room_id)
868 })
869 .await
870 }
871
872 pub async fn check_room_participants(
873 &self,
874 room_id: RoomId,
875 leader_id: ConnectionId,
876 follower_id: ConnectionId,
877 ) -> Result<()> {
878 self.transaction(|tx| async move {
879 use room_participant::Column;
880
881 let count = room_participant::Entity::find()
882 .filter(
883 Condition::all().add(Column::RoomId.eq(room_id)).add(
884 Condition::any()
885 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
886 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
887 ))
888 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
889 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
890 )),
891 ),
892 )
893 .count(&*tx)
894 .await?;
895
896 if count < 2 {
897 Err(anyhow!("not room participants"))?;
898 }
899
900 Ok(())
901 })
902 .await
903 }
904
905 pub async fn follow(
906 &self,
907 room_id: RoomId,
908 project_id: ProjectId,
909 leader_connection: ConnectionId,
910 follower_connection: ConnectionId,
911 ) -> Result<RoomGuard<proto::Room>> {
912 self.room_transaction(room_id, |tx| async move {
913 follower::ActiveModel {
914 room_id: ActiveValue::set(room_id),
915 project_id: ActiveValue::set(project_id),
916 leader_connection_server_id: ActiveValue::set(ServerId(
917 leader_connection.owner_id as i32,
918 )),
919 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
920 follower_connection_server_id: ActiveValue::set(ServerId(
921 follower_connection.owner_id as i32,
922 )),
923 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
924 ..Default::default()
925 }
926 .insert(&*tx)
927 .await?;
928
929 let room = self.get_room(room_id, &*tx).await?;
930 Ok(room)
931 })
932 .await
933 }
934
935 pub async fn unfollow(
936 &self,
937 room_id: RoomId,
938 project_id: ProjectId,
939 leader_connection: ConnectionId,
940 follower_connection: ConnectionId,
941 ) -> Result<RoomGuard<proto::Room>> {
942 self.room_transaction(room_id, |tx| async move {
943 follower::Entity::delete_many()
944 .filter(
945 Condition::all()
946 .add(follower::Column::RoomId.eq(room_id))
947 .add(follower::Column::ProjectId.eq(project_id))
948 .add(
949 follower::Column::LeaderConnectionServerId
950 .eq(leader_connection.owner_id),
951 )
952 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
953 .add(
954 follower::Column::FollowerConnectionServerId
955 .eq(follower_connection.owner_id),
956 )
957 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
958 )
959 .exec(&*tx)
960 .await?;
961
962 let room = self.get_room(room_id, &*tx).await?;
963 Ok(room)
964 })
965 .await
966 }
967}