1use super::*;
2
3impl Database {
4 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
5 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
6 enum QueryAs {
7 Count,
8 }
9
10 self.transaction(|tx| async move {
11 Ok(project::Entity::find()
12 .select_only()
13 .column_as(project::Column::Id.count(), QueryAs::Count)
14 .inner_join(user::Entity)
15 .filter(user::Column::Admin.eq(false))
16 .into_values::<_, QueryAs>()
17 .one(&*tx)
18 .await?
19 .unwrap_or(0i64) as usize)
20 })
21 .await
22 }
23
24 pub async fn share_project(
25 &self,
26 room_id: RoomId,
27 connection: ConnectionId,
28 worktrees: &[proto::WorktreeMetadata],
29 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
30 self.room_transaction(room_id, |tx| async move {
31 let participant = room_participant::Entity::find()
32 .filter(
33 Condition::all()
34 .add(
35 room_participant::Column::AnsweringConnectionId
36 .eq(connection.id as i32),
37 )
38 .add(
39 room_participant::Column::AnsweringConnectionServerId
40 .eq(connection.owner_id as i32),
41 ),
42 )
43 .one(&*tx)
44 .await?
45 .ok_or_else(|| anyhow!("could not find participant"))?;
46 if participant.room_id != room_id {
47 return Err(anyhow!("shared project on unexpected room"))?;
48 }
49
50 let project = project::ActiveModel {
51 room_id: ActiveValue::set(participant.room_id),
52 host_user_id: ActiveValue::set(participant.user_id),
53 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
54 host_connection_server_id: ActiveValue::set(Some(ServerId(
55 connection.owner_id as i32,
56 ))),
57 ..Default::default()
58 }
59 .insert(&*tx)
60 .await?;
61
62 if !worktrees.is_empty() {
63 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
64 worktree::ActiveModel {
65 id: ActiveValue::set(worktree.id as i64),
66 project_id: ActiveValue::set(project.id),
67 abs_path: ActiveValue::set(worktree.abs_path.clone()),
68 root_name: ActiveValue::set(worktree.root_name.clone()),
69 visible: ActiveValue::set(worktree.visible),
70 scan_id: ActiveValue::set(0),
71 completed_scan_id: ActiveValue::set(0),
72 }
73 }))
74 .exec(&*tx)
75 .await?;
76 }
77
78 project_collaborator::ActiveModel {
79 project_id: ActiveValue::set(project.id),
80 connection_id: ActiveValue::set(connection.id as i32),
81 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
82 user_id: ActiveValue::set(participant.user_id),
83 replica_id: ActiveValue::set(ReplicaId(0)),
84 is_host: ActiveValue::set(true),
85 ..Default::default()
86 }
87 .insert(&*tx)
88 .await?;
89
90 let room = self.get_room(room_id, &tx).await?;
91 Ok((project.id, room))
92 })
93 .await
94 }
95
96 pub async fn unshare_project(
97 &self,
98 project_id: ProjectId,
99 connection: ConnectionId,
100 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
101 let room_id = self.room_id_for_project(project_id).await?;
102 self.room_transaction(room_id, |tx| async move {
103 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
104
105 let project = project::Entity::find_by_id(project_id)
106 .one(&*tx)
107 .await?
108 .ok_or_else(|| anyhow!("project not found"))?;
109 if project.host_connection()? == connection {
110 project::Entity::delete(project.into_active_model())
111 .exec(&*tx)
112 .await?;
113 let room = self.get_room(room_id, &tx).await?;
114 Ok((room, guest_connection_ids))
115 } else {
116 Err(anyhow!("cannot unshare a project hosted by another user"))?
117 }
118 })
119 .await
120 }
121
122 pub async fn update_project(
123 &self,
124 project_id: ProjectId,
125 connection: ConnectionId,
126 worktrees: &[proto::WorktreeMetadata],
127 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
128 let room_id = self.room_id_for_project(project_id).await?;
129 self.room_transaction(room_id, |tx| async move {
130 let project = project::Entity::find_by_id(project_id)
131 .filter(
132 Condition::all()
133 .add(project::Column::HostConnectionId.eq(connection.id as i32))
134 .add(
135 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
136 ),
137 )
138 .one(&*tx)
139 .await?
140 .ok_or_else(|| anyhow!("no such project"))?;
141
142 self.update_project_worktrees(project.id, worktrees, &tx)
143 .await?;
144
145 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
146 let room = self.get_room(project.room_id, &tx).await?;
147 Ok((room, guest_connection_ids))
148 })
149 .await
150 }
151
152 pub(in crate::db) async fn update_project_worktrees(
153 &self,
154 project_id: ProjectId,
155 worktrees: &[proto::WorktreeMetadata],
156 tx: &DatabaseTransaction,
157 ) -> Result<()> {
158 if !worktrees.is_empty() {
159 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
160 id: ActiveValue::set(worktree.id as i64),
161 project_id: ActiveValue::set(project_id),
162 abs_path: ActiveValue::set(worktree.abs_path.clone()),
163 root_name: ActiveValue::set(worktree.root_name.clone()),
164 visible: ActiveValue::set(worktree.visible),
165 scan_id: ActiveValue::set(0),
166 completed_scan_id: ActiveValue::set(0),
167 }))
168 .on_conflict(
169 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
170 .update_column(worktree::Column::RootName)
171 .to_owned(),
172 )
173 .exec(&*tx)
174 .await?;
175 }
176
177 worktree::Entity::delete_many()
178 .filter(worktree::Column::ProjectId.eq(project_id).and(
179 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
180 ))
181 .exec(&*tx)
182 .await?;
183
184 Ok(())
185 }
186
187 pub async fn update_worktree(
188 &self,
189 update: &proto::UpdateWorktree,
190 connection: ConnectionId,
191 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
192 let project_id = ProjectId::from_proto(update.project_id);
193 let worktree_id = update.worktree_id as i64;
194 let room_id = self.room_id_for_project(project_id).await?;
195 self.room_transaction(room_id, |tx| async move {
196 // Ensure the update comes from the host.
197 let _project = project::Entity::find_by_id(project_id)
198 .filter(
199 Condition::all()
200 .add(project::Column::HostConnectionId.eq(connection.id as i32))
201 .add(
202 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
203 ),
204 )
205 .one(&*tx)
206 .await?
207 .ok_or_else(|| anyhow!("no such project"))?;
208
209 // Update metadata.
210 worktree::Entity::update(worktree::ActiveModel {
211 id: ActiveValue::set(worktree_id),
212 project_id: ActiveValue::set(project_id),
213 root_name: ActiveValue::set(update.root_name.clone()),
214 scan_id: ActiveValue::set(update.scan_id as i64),
215 completed_scan_id: if update.is_last_update {
216 ActiveValue::set(update.scan_id as i64)
217 } else {
218 ActiveValue::default()
219 },
220 abs_path: ActiveValue::set(update.abs_path.clone()),
221 ..Default::default()
222 })
223 .exec(&*tx)
224 .await?;
225
226 if !update.updated_entries.is_empty() {
227 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
228 let mtime = entry.mtime.clone().unwrap_or_default();
229 worktree_entry::ActiveModel {
230 project_id: ActiveValue::set(project_id),
231 worktree_id: ActiveValue::set(worktree_id),
232 id: ActiveValue::set(entry.id as i64),
233 is_dir: ActiveValue::set(entry.is_dir),
234 path: ActiveValue::set(entry.path.clone()),
235 inode: ActiveValue::set(entry.inode as i64),
236 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
237 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
238 is_symlink: ActiveValue::set(entry.is_symlink),
239 is_ignored: ActiveValue::set(entry.is_ignored),
240 is_external: ActiveValue::set(entry.is_external),
241 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
242 is_deleted: ActiveValue::set(false),
243 scan_id: ActiveValue::set(update.scan_id as i64),
244 }
245 }))
246 .on_conflict(
247 OnConflict::columns([
248 worktree_entry::Column::ProjectId,
249 worktree_entry::Column::WorktreeId,
250 worktree_entry::Column::Id,
251 ])
252 .update_columns([
253 worktree_entry::Column::IsDir,
254 worktree_entry::Column::Path,
255 worktree_entry::Column::Inode,
256 worktree_entry::Column::MtimeSeconds,
257 worktree_entry::Column::MtimeNanos,
258 worktree_entry::Column::IsSymlink,
259 worktree_entry::Column::IsIgnored,
260 worktree_entry::Column::GitStatus,
261 worktree_entry::Column::ScanId,
262 ])
263 .to_owned(),
264 )
265 .exec(&*tx)
266 .await?;
267 }
268
269 if !update.removed_entries.is_empty() {
270 worktree_entry::Entity::update_many()
271 .filter(
272 worktree_entry::Column::ProjectId
273 .eq(project_id)
274 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
275 .and(
276 worktree_entry::Column::Id
277 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
278 ),
279 )
280 .set(worktree_entry::ActiveModel {
281 is_deleted: ActiveValue::Set(true),
282 scan_id: ActiveValue::Set(update.scan_id as i64),
283 ..Default::default()
284 })
285 .exec(&*tx)
286 .await?;
287 }
288
289 if !update.updated_repositories.is_empty() {
290 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
291 |repository| worktree_repository::ActiveModel {
292 project_id: ActiveValue::set(project_id),
293 worktree_id: ActiveValue::set(worktree_id),
294 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
295 scan_id: ActiveValue::set(update.scan_id as i64),
296 branch: ActiveValue::set(repository.branch.clone()),
297 is_deleted: ActiveValue::set(false),
298 },
299 ))
300 .on_conflict(
301 OnConflict::columns([
302 worktree_repository::Column::ProjectId,
303 worktree_repository::Column::WorktreeId,
304 worktree_repository::Column::WorkDirectoryId,
305 ])
306 .update_columns([
307 worktree_repository::Column::ScanId,
308 worktree_repository::Column::Branch,
309 ])
310 .to_owned(),
311 )
312 .exec(&*tx)
313 .await?;
314 }
315
316 if !update.removed_repositories.is_empty() {
317 worktree_repository::Entity::update_many()
318 .filter(
319 worktree_repository::Column::ProjectId
320 .eq(project_id)
321 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
322 .and(
323 worktree_repository::Column::WorkDirectoryId
324 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
325 ),
326 )
327 .set(worktree_repository::ActiveModel {
328 is_deleted: ActiveValue::Set(true),
329 scan_id: ActiveValue::Set(update.scan_id as i64),
330 ..Default::default()
331 })
332 .exec(&*tx)
333 .await?;
334 }
335
336 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
337 Ok(connection_ids)
338 })
339 .await
340 }
341
342 pub async fn update_diagnostic_summary(
343 &self,
344 update: &proto::UpdateDiagnosticSummary,
345 connection: ConnectionId,
346 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
347 let project_id = ProjectId::from_proto(update.project_id);
348 let worktree_id = update.worktree_id as i64;
349 let room_id = self.room_id_for_project(project_id).await?;
350 self.room_transaction(room_id, |tx| async move {
351 let summary = update
352 .summary
353 .as_ref()
354 .ok_or_else(|| anyhow!("invalid summary"))?;
355
356 // Ensure the update comes from the host.
357 let project = project::Entity::find_by_id(project_id)
358 .one(&*tx)
359 .await?
360 .ok_or_else(|| anyhow!("no such project"))?;
361 if project.host_connection()? != connection {
362 return Err(anyhow!("can't update a project hosted by someone else"))?;
363 }
364
365 // Update summary.
366 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
367 project_id: ActiveValue::set(project_id),
368 worktree_id: ActiveValue::set(worktree_id),
369 path: ActiveValue::set(summary.path.clone()),
370 language_server_id: ActiveValue::set(summary.language_server_id as i64),
371 error_count: ActiveValue::set(summary.error_count as i32),
372 warning_count: ActiveValue::set(summary.warning_count as i32),
373 ..Default::default()
374 })
375 .on_conflict(
376 OnConflict::columns([
377 worktree_diagnostic_summary::Column::ProjectId,
378 worktree_diagnostic_summary::Column::WorktreeId,
379 worktree_diagnostic_summary::Column::Path,
380 ])
381 .update_columns([
382 worktree_diagnostic_summary::Column::LanguageServerId,
383 worktree_diagnostic_summary::Column::ErrorCount,
384 worktree_diagnostic_summary::Column::WarningCount,
385 ])
386 .to_owned(),
387 )
388 .exec(&*tx)
389 .await?;
390
391 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
392 Ok(connection_ids)
393 })
394 .await
395 }
396
397 pub async fn start_language_server(
398 &self,
399 update: &proto::StartLanguageServer,
400 connection: ConnectionId,
401 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
402 let project_id = ProjectId::from_proto(update.project_id);
403 let room_id = self.room_id_for_project(project_id).await?;
404 self.room_transaction(room_id, |tx| async move {
405 let server = update
406 .server
407 .as_ref()
408 .ok_or_else(|| anyhow!("invalid language server"))?;
409
410 // Ensure the update comes from the host.
411 let project = project::Entity::find_by_id(project_id)
412 .one(&*tx)
413 .await?
414 .ok_or_else(|| anyhow!("no such project"))?;
415 if project.host_connection()? != connection {
416 return Err(anyhow!("can't update a project hosted by someone else"))?;
417 }
418
419 // Add the newly-started language server.
420 language_server::Entity::insert(language_server::ActiveModel {
421 project_id: ActiveValue::set(project_id),
422 id: ActiveValue::set(server.id as i64),
423 name: ActiveValue::set(server.name.clone()),
424 ..Default::default()
425 })
426 .on_conflict(
427 OnConflict::columns([
428 language_server::Column::ProjectId,
429 language_server::Column::Id,
430 ])
431 .update_column(language_server::Column::Name)
432 .to_owned(),
433 )
434 .exec(&*tx)
435 .await?;
436
437 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
438 Ok(connection_ids)
439 })
440 .await
441 }
442
443 pub async fn update_worktree_settings(
444 &self,
445 update: &proto::UpdateWorktreeSettings,
446 connection: ConnectionId,
447 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
448 let project_id = ProjectId::from_proto(update.project_id);
449 let room_id = self.room_id_for_project(project_id).await?;
450 self.room_transaction(room_id, |tx| async move {
451 // Ensure the update comes from the host.
452 let project = project::Entity::find_by_id(project_id)
453 .one(&*tx)
454 .await?
455 .ok_or_else(|| anyhow!("no such project"))?;
456 if project.host_connection()? != connection {
457 return Err(anyhow!("can't update a project hosted by someone else"))?;
458 }
459
460 if let Some(content) = &update.content {
461 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
462 project_id: ActiveValue::Set(project_id),
463 worktree_id: ActiveValue::Set(update.worktree_id as i64),
464 path: ActiveValue::Set(update.path.clone()),
465 content: ActiveValue::Set(content.clone()),
466 })
467 .on_conflict(
468 OnConflict::columns([
469 worktree_settings_file::Column::ProjectId,
470 worktree_settings_file::Column::WorktreeId,
471 worktree_settings_file::Column::Path,
472 ])
473 .update_column(worktree_settings_file::Column::Content)
474 .to_owned(),
475 )
476 .exec(&*tx)
477 .await?;
478 } else {
479 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
480 project_id: ActiveValue::Set(project_id),
481 worktree_id: ActiveValue::Set(update.worktree_id as i64),
482 path: ActiveValue::Set(update.path.clone()),
483 ..Default::default()
484 })
485 .exec(&*tx)
486 .await?;
487 }
488
489 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
490 Ok(connection_ids)
491 })
492 .await
493 }
494
495 pub async fn join_project(
496 &self,
497 project_id: ProjectId,
498 connection: ConnectionId,
499 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
500 let room_id = self.room_id_for_project(project_id).await?;
501 self.room_transaction(room_id, |tx| async move {
502 let participant = room_participant::Entity::find()
503 .filter(
504 Condition::all()
505 .add(
506 room_participant::Column::AnsweringConnectionId
507 .eq(connection.id as i32),
508 )
509 .add(
510 room_participant::Column::AnsweringConnectionServerId
511 .eq(connection.owner_id as i32),
512 ),
513 )
514 .one(&*tx)
515 .await?
516 .ok_or_else(|| anyhow!("must join a room first"))?;
517
518 let project = project::Entity::find_by_id(project_id)
519 .one(&*tx)
520 .await?
521 .ok_or_else(|| anyhow!("no such project"))?;
522 if project.room_id != participant.room_id {
523 return Err(anyhow!("no such project"))?;
524 }
525
526 let mut collaborators = project
527 .find_related(project_collaborator::Entity)
528 .all(&*tx)
529 .await?;
530 let replica_ids = collaborators
531 .iter()
532 .map(|c| c.replica_id)
533 .collect::<HashSet<_>>();
534 let mut replica_id = ReplicaId(1);
535 while replica_ids.contains(&replica_id) {
536 replica_id.0 += 1;
537 }
538 let new_collaborator = project_collaborator::ActiveModel {
539 project_id: ActiveValue::set(project_id),
540 connection_id: ActiveValue::set(connection.id as i32),
541 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
542 user_id: ActiveValue::set(participant.user_id),
543 replica_id: ActiveValue::set(replica_id),
544 is_host: ActiveValue::set(false),
545 ..Default::default()
546 }
547 .insert(&*tx)
548 .await?;
549 collaborators.push(new_collaborator);
550
551 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
552 let mut worktrees = db_worktrees
553 .into_iter()
554 .map(|db_worktree| {
555 (
556 db_worktree.id as u64,
557 Worktree {
558 id: db_worktree.id as u64,
559 abs_path: db_worktree.abs_path,
560 root_name: db_worktree.root_name,
561 visible: db_worktree.visible,
562 entries: Default::default(),
563 repository_entries: Default::default(),
564 diagnostic_summaries: Default::default(),
565 settings_files: Default::default(),
566 scan_id: db_worktree.scan_id as u64,
567 completed_scan_id: db_worktree.completed_scan_id as u64,
568 },
569 )
570 })
571 .collect::<BTreeMap<_, _>>();
572
573 // Populate worktree entries.
574 {
575 let mut db_entries = worktree_entry::Entity::find()
576 .filter(
577 Condition::all()
578 .add(worktree_entry::Column::ProjectId.eq(project_id))
579 .add(worktree_entry::Column::IsDeleted.eq(false)),
580 )
581 .stream(&*tx)
582 .await?;
583 while let Some(db_entry) = db_entries.next().await {
584 let db_entry = db_entry?;
585 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
586 worktree.entries.push(proto::Entry {
587 id: db_entry.id as u64,
588 is_dir: db_entry.is_dir,
589 path: db_entry.path,
590 inode: db_entry.inode as u64,
591 mtime: Some(proto::Timestamp {
592 seconds: db_entry.mtime_seconds as u64,
593 nanos: db_entry.mtime_nanos as u32,
594 }),
595 is_symlink: db_entry.is_symlink,
596 is_ignored: db_entry.is_ignored,
597 is_external: db_entry.is_external,
598 git_status: db_entry.git_status.map(|status| status as i32),
599 });
600 }
601 }
602 }
603
604 // Populate repository entries.
605 {
606 let mut db_repository_entries = worktree_repository::Entity::find()
607 .filter(
608 Condition::all()
609 .add(worktree_repository::Column::ProjectId.eq(project_id))
610 .add(worktree_repository::Column::IsDeleted.eq(false)),
611 )
612 .stream(&*tx)
613 .await?;
614 while let Some(db_repository_entry) = db_repository_entries.next().await {
615 let db_repository_entry = db_repository_entry?;
616 if let Some(worktree) =
617 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
618 {
619 worktree.repository_entries.insert(
620 db_repository_entry.work_directory_id as u64,
621 proto::RepositoryEntry {
622 work_directory_id: db_repository_entry.work_directory_id as u64,
623 branch: db_repository_entry.branch,
624 },
625 );
626 }
627 }
628 }
629
630 // Populate worktree diagnostic summaries.
631 {
632 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
633 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
634 .stream(&*tx)
635 .await?;
636 while let Some(db_summary) = db_summaries.next().await {
637 let db_summary = db_summary?;
638 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
639 worktree
640 .diagnostic_summaries
641 .push(proto::DiagnosticSummary {
642 path: db_summary.path,
643 language_server_id: db_summary.language_server_id as u64,
644 error_count: db_summary.error_count as u32,
645 warning_count: db_summary.warning_count as u32,
646 });
647 }
648 }
649 }
650
651 // Populate worktree settings files
652 {
653 let mut db_settings_files = worktree_settings_file::Entity::find()
654 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
655 .stream(&*tx)
656 .await?;
657 while let Some(db_settings_file) = db_settings_files.next().await {
658 let db_settings_file = db_settings_file?;
659 if let Some(worktree) =
660 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
661 {
662 worktree.settings_files.push(WorktreeSettingsFile {
663 path: db_settings_file.path,
664 content: db_settings_file.content,
665 });
666 }
667 }
668 }
669
670 // Populate language servers.
671 let language_servers = project
672 .find_related(language_server::Entity)
673 .all(&*tx)
674 .await?;
675
676 let project = Project {
677 collaborators: collaborators
678 .into_iter()
679 .map(|collaborator| ProjectCollaborator {
680 connection_id: collaborator.connection(),
681 user_id: collaborator.user_id,
682 replica_id: collaborator.replica_id,
683 is_host: collaborator.is_host,
684 })
685 .collect(),
686 worktrees,
687 language_servers: language_servers
688 .into_iter()
689 .map(|language_server| proto::LanguageServer {
690 id: language_server.id as u64,
691 name: language_server.name,
692 })
693 .collect(),
694 };
695 Ok((project, replica_id as ReplicaId))
696 })
697 .await
698 }
699
700 pub async fn leave_project(
701 &self,
702 project_id: ProjectId,
703 connection: ConnectionId,
704 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
705 let room_id = self.room_id_for_project(project_id).await?;
706 self.room_transaction(room_id, |tx| async move {
707 let result = project_collaborator::Entity::delete_many()
708 .filter(
709 Condition::all()
710 .add(project_collaborator::Column::ProjectId.eq(project_id))
711 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
712 .add(
713 project_collaborator::Column::ConnectionServerId
714 .eq(connection.owner_id as i32),
715 ),
716 )
717 .exec(&*tx)
718 .await?;
719 if result.rows_affected == 0 {
720 Err(anyhow!("not a collaborator on this project"))?;
721 }
722
723 let project = project::Entity::find_by_id(project_id)
724 .one(&*tx)
725 .await?
726 .ok_or_else(|| anyhow!("no such project"))?;
727 let collaborators = project
728 .find_related(project_collaborator::Entity)
729 .all(&*tx)
730 .await?;
731 let connection_ids = collaborators
732 .into_iter()
733 .map(|collaborator| collaborator.connection())
734 .collect();
735
736 follower::Entity::delete_many()
737 .filter(
738 Condition::any()
739 .add(
740 Condition::all()
741 .add(follower::Column::ProjectId.eq(Some(project_id)))
742 .add(
743 follower::Column::LeaderConnectionServerId
744 .eq(connection.owner_id),
745 )
746 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
747 )
748 .add(
749 Condition::all()
750 .add(follower::Column::ProjectId.eq(Some(project_id)))
751 .add(
752 follower::Column::FollowerConnectionServerId
753 .eq(connection.owner_id),
754 )
755 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
756 ),
757 )
758 .exec(&*tx)
759 .await?;
760
761 let room = self.get_room(project.room_id, &tx).await?;
762 let left_project = LeftProject {
763 id: project_id,
764 host_user_id: project.host_user_id,
765 host_connection_id: project.host_connection()?,
766 connection_ids,
767 };
768 Ok((room, left_project))
769 })
770 .await
771 }
772
773 pub async fn project_collaborators(
774 &self,
775 project_id: ProjectId,
776 connection_id: ConnectionId,
777 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
778 let room_id = self.room_id_for_project(project_id).await?;
779 self.room_transaction(room_id, |tx| async move {
780 let collaborators = project_collaborator::Entity::find()
781 .filter(project_collaborator::Column::ProjectId.eq(project_id))
782 .all(&*tx)
783 .await?
784 .into_iter()
785 .map(|collaborator| ProjectCollaborator {
786 connection_id: collaborator.connection(),
787 user_id: collaborator.user_id,
788 replica_id: collaborator.replica_id,
789 is_host: collaborator.is_host,
790 })
791 .collect::<Vec<_>>();
792
793 if collaborators
794 .iter()
795 .any(|collaborator| collaborator.connection_id == connection_id)
796 {
797 Ok(collaborators)
798 } else {
799 Err(anyhow!("no such project"))?
800 }
801 })
802 .await
803 }
804
805 pub async fn project_connection_ids(
806 &self,
807 project_id: ProjectId,
808 connection_id: ConnectionId,
809 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
810 let room_id = self.room_id_for_project(project_id).await?;
811 self.room_transaction(room_id, |tx| async move {
812 let mut collaborators = project_collaborator::Entity::find()
813 .filter(project_collaborator::Column::ProjectId.eq(project_id))
814 .stream(&*tx)
815 .await?;
816
817 let mut connection_ids = HashSet::default();
818 while let Some(collaborator) = collaborators.next().await {
819 let collaborator = collaborator?;
820 connection_ids.insert(collaborator.connection());
821 }
822
823 if connection_ids.contains(&connection_id) {
824 Ok(connection_ids)
825 } else {
826 Err(anyhow!("no such project"))?
827 }
828 })
829 .await
830 }
831
832 async fn project_guest_connection_ids(
833 &self,
834 project_id: ProjectId,
835 tx: &DatabaseTransaction,
836 ) -> Result<Vec<ConnectionId>> {
837 let mut collaborators = project_collaborator::Entity::find()
838 .filter(
839 project_collaborator::Column::ProjectId
840 .eq(project_id)
841 .and(project_collaborator::Column::IsHost.eq(false)),
842 )
843 .stream(tx)
844 .await?;
845
846 let mut guest_connection_ids = Vec::new();
847 while let Some(collaborator) = collaborators.next().await {
848 let collaborator = collaborator?;
849 guest_connection_ids.push(collaborator.connection());
850 }
851 Ok(guest_connection_ids)
852 }
853
854 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
855 self.transaction(|tx| async move {
856 let project = project::Entity::find_by_id(project_id)
857 .one(&*tx)
858 .await?
859 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
860 Ok(project.room_id)
861 })
862 .await
863 }
864
865 pub async fn check_room_participants(
866 &self,
867 room_id: RoomId,
868 leader_id: ConnectionId,
869 follower_id: ConnectionId,
870 ) -> Result<()> {
871 self.transaction(|tx| async move {
872 use room_participant::Column;
873
874 let count = room_participant::Entity::find()
875 .filter(
876 Condition::all().add(Column::RoomId.eq(room_id)).add(
877 Condition::any()
878 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
879 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
880 ))
881 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
882 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
883 )),
884 ),
885 )
886 .count(&*tx)
887 .await?;
888
889 if count < 2 {
890 Err(anyhow!("not room participants"))?;
891 }
892
893 Ok(())
894 })
895 .await
896 }
897
898 pub async fn follow(
899 &self,
900 room_id: RoomId,
901 project_id: ProjectId,
902 leader_connection: ConnectionId,
903 follower_connection: ConnectionId,
904 ) -> Result<RoomGuard<proto::Room>> {
905 self.room_transaction(room_id, |tx| async move {
906 follower::ActiveModel {
907 room_id: ActiveValue::set(room_id),
908 project_id: ActiveValue::set(project_id),
909 leader_connection_server_id: ActiveValue::set(ServerId(
910 leader_connection.owner_id as i32,
911 )),
912 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
913 follower_connection_server_id: ActiveValue::set(ServerId(
914 follower_connection.owner_id as i32,
915 )),
916 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
917 ..Default::default()
918 }
919 .insert(&*tx)
920 .await?;
921
922 let room = self.get_room(room_id, &*tx).await?;
923 Ok(room)
924 })
925 .await
926 }
927
928 pub async fn unfollow(
929 &self,
930 room_id: RoomId,
931 project_id: ProjectId,
932 leader_connection: ConnectionId,
933 follower_connection: ConnectionId,
934 ) -> Result<RoomGuard<proto::Room>> {
935 self.room_transaction(room_id, |tx| async move {
936 follower::Entity::delete_many()
937 .filter(
938 Condition::all()
939 .add(follower::Column::RoomId.eq(room_id))
940 .add(follower::Column::ProjectId.eq(project_id))
941 .add(
942 follower::Column::LeaderConnectionServerId
943 .eq(leader_connection.owner_id),
944 )
945 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
946 .add(
947 follower::Column::FollowerConnectionServerId
948 .eq(follower_connection.owner_id),
949 )
950 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
951 )
952 .exec(&*tx)
953 .await?;
954
955 let room = self.get_room(room_id, &*tx).await?;
956 Ok(room)
957 })
958 .await
959 }
960}