1use super::*;
2
3impl Database {
4 pub async fn project_count_excluding_admins(&self) -> Result<usize> {
5 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
6 enum QueryAs {
7 Count,
8 }
9
10 self.transaction(|tx| async move {
11 Ok(project::Entity::find()
12 .select_only()
13 .column_as(project::Column::Id.count(), QueryAs::Count)
14 .inner_join(user::Entity)
15 .filter(user::Column::Admin.eq(false))
16 .into_values::<_, QueryAs>()
17 .one(&*tx)
18 .await?
19 .unwrap_or(0i64) as usize)
20 })
21 .await
22 }
23
24 pub async fn share_project(
25 &self,
26 room_id: RoomId,
27 connection: ConnectionId,
28 worktrees: &[proto::WorktreeMetadata],
29 ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
30 self.room_transaction(room_id, |tx| async move {
31 let participant = room_participant::Entity::find()
32 .filter(
33 Condition::all()
34 .add(
35 room_participant::Column::AnsweringConnectionId
36 .eq(connection.id as i32),
37 )
38 .add(
39 room_participant::Column::AnsweringConnectionServerId
40 .eq(connection.owner_id as i32),
41 ),
42 )
43 .one(&*tx)
44 .await?
45 .ok_or_else(|| anyhow!("could not find participant"))?;
46 if participant.room_id != room_id {
47 return Err(anyhow!("shared project on unexpected room"))?;
48 }
49 if !participant
50 .role
51 .unwrap_or(ChannelRole::Member)
52 .can_share_projects()
53 {
54 return Err(anyhow!("guests cannot share projects"))?;
55 }
56
57 let project = project::ActiveModel {
58 room_id: ActiveValue::set(participant.room_id),
59 host_user_id: ActiveValue::set(participant.user_id),
60 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
61 host_connection_server_id: ActiveValue::set(Some(ServerId(
62 connection.owner_id as i32,
63 ))),
64 ..Default::default()
65 }
66 .insert(&*tx)
67 .await?;
68
69 if !worktrees.is_empty() {
70 worktree::Entity::insert_many(worktrees.iter().map(|worktree| {
71 worktree::ActiveModel {
72 id: ActiveValue::set(worktree.id as i64),
73 project_id: ActiveValue::set(project.id),
74 abs_path: ActiveValue::set(worktree.abs_path.clone()),
75 root_name: ActiveValue::set(worktree.root_name.clone()),
76 visible: ActiveValue::set(worktree.visible),
77 scan_id: ActiveValue::set(0),
78 completed_scan_id: ActiveValue::set(0),
79 }
80 }))
81 .exec(&*tx)
82 .await?;
83 }
84
85 project_collaborator::ActiveModel {
86 project_id: ActiveValue::set(project.id),
87 connection_id: ActiveValue::set(connection.id as i32),
88 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
89 user_id: ActiveValue::set(participant.user_id),
90 replica_id: ActiveValue::set(ReplicaId(0)),
91 is_host: ActiveValue::set(true),
92 ..Default::default()
93 }
94 .insert(&*tx)
95 .await?;
96
97 let room = self.get_room(room_id, &tx).await?;
98 Ok((project.id, room))
99 })
100 .await
101 }
102
103 pub async fn unshare_project(
104 &self,
105 project_id: ProjectId,
106 connection: ConnectionId,
107 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
108 let room_id = self.room_id_for_project(project_id).await?;
109 self.room_transaction(room_id, |tx| async move {
110 let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
111
112 let project = project::Entity::find_by_id(project_id)
113 .one(&*tx)
114 .await?
115 .ok_or_else(|| anyhow!("project not found"))?;
116 if project.host_connection()? == connection {
117 project::Entity::delete(project.into_active_model())
118 .exec(&*tx)
119 .await?;
120 let room = self.get_room(room_id, &tx).await?;
121 Ok((room, guest_connection_ids))
122 } else {
123 Err(anyhow!("cannot unshare a project hosted by another user"))?
124 }
125 })
126 .await
127 }
128
129 pub async fn update_project(
130 &self,
131 project_id: ProjectId,
132 connection: ConnectionId,
133 worktrees: &[proto::WorktreeMetadata],
134 ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
135 let room_id = self.room_id_for_project(project_id).await?;
136 self.room_transaction(room_id, |tx| async move {
137 let project = project::Entity::find_by_id(project_id)
138 .filter(
139 Condition::all()
140 .add(project::Column::HostConnectionId.eq(connection.id as i32))
141 .add(
142 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
143 ),
144 )
145 .one(&*tx)
146 .await?
147 .ok_or_else(|| anyhow!("no such project"))?;
148
149 self.update_project_worktrees(project.id, worktrees, &tx)
150 .await?;
151
152 let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
153 let room = self.get_room(project.room_id, &tx).await?;
154 Ok((room, guest_connection_ids))
155 })
156 .await
157 }
158
159 pub(in crate::db) async fn update_project_worktrees(
160 &self,
161 project_id: ProjectId,
162 worktrees: &[proto::WorktreeMetadata],
163 tx: &DatabaseTransaction,
164 ) -> Result<()> {
165 if !worktrees.is_empty() {
166 worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
167 id: ActiveValue::set(worktree.id as i64),
168 project_id: ActiveValue::set(project_id),
169 abs_path: ActiveValue::set(worktree.abs_path.clone()),
170 root_name: ActiveValue::set(worktree.root_name.clone()),
171 visible: ActiveValue::set(worktree.visible),
172 scan_id: ActiveValue::set(0),
173 completed_scan_id: ActiveValue::set(0),
174 }))
175 .on_conflict(
176 OnConflict::columns([worktree::Column::ProjectId, worktree::Column::Id])
177 .update_column(worktree::Column::RootName)
178 .to_owned(),
179 )
180 .exec(&*tx)
181 .await?;
182 }
183
184 worktree::Entity::delete_many()
185 .filter(worktree::Column::ProjectId.eq(project_id).and(
186 worktree::Column::Id.is_not_in(worktrees.iter().map(|worktree| worktree.id as i64)),
187 ))
188 .exec(&*tx)
189 .await?;
190
191 Ok(())
192 }
193
194 pub async fn update_worktree(
195 &self,
196 update: &proto::UpdateWorktree,
197 connection: ConnectionId,
198 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
199 let project_id = ProjectId::from_proto(update.project_id);
200 let worktree_id = update.worktree_id as i64;
201 let room_id = self.room_id_for_project(project_id).await?;
202 self.room_transaction(room_id, |tx| async move {
203 // Ensure the update comes from the host.
204 let _project = project::Entity::find_by_id(project_id)
205 .filter(
206 Condition::all()
207 .add(project::Column::HostConnectionId.eq(connection.id as i32))
208 .add(
209 project::Column::HostConnectionServerId.eq(connection.owner_id as i32),
210 ),
211 )
212 .one(&*tx)
213 .await?
214 .ok_or_else(|| anyhow!("no such project"))?;
215
216 // Update metadata.
217 worktree::Entity::update(worktree::ActiveModel {
218 id: ActiveValue::set(worktree_id),
219 project_id: ActiveValue::set(project_id),
220 root_name: ActiveValue::set(update.root_name.clone()),
221 scan_id: ActiveValue::set(update.scan_id as i64),
222 completed_scan_id: if update.is_last_update {
223 ActiveValue::set(update.scan_id as i64)
224 } else {
225 ActiveValue::default()
226 },
227 abs_path: ActiveValue::set(update.abs_path.clone()),
228 ..Default::default()
229 })
230 .exec(&*tx)
231 .await?;
232
233 if !update.updated_entries.is_empty() {
234 worktree_entry::Entity::insert_many(update.updated_entries.iter().map(|entry| {
235 let mtime = entry.mtime.clone().unwrap_or_default();
236 worktree_entry::ActiveModel {
237 project_id: ActiveValue::set(project_id),
238 worktree_id: ActiveValue::set(worktree_id),
239 id: ActiveValue::set(entry.id as i64),
240 is_dir: ActiveValue::set(entry.is_dir),
241 path: ActiveValue::set(entry.path.clone()),
242 inode: ActiveValue::set(entry.inode as i64),
243 mtime_seconds: ActiveValue::set(mtime.seconds as i64),
244 mtime_nanos: ActiveValue::set(mtime.nanos as i32),
245 is_symlink: ActiveValue::set(entry.is_symlink),
246 is_ignored: ActiveValue::set(entry.is_ignored),
247 is_external: ActiveValue::set(entry.is_external),
248 git_status: ActiveValue::set(entry.git_status.map(|status| status as i64)),
249 is_deleted: ActiveValue::set(false),
250 scan_id: ActiveValue::set(update.scan_id as i64),
251 }
252 }))
253 .on_conflict(
254 OnConflict::columns([
255 worktree_entry::Column::ProjectId,
256 worktree_entry::Column::WorktreeId,
257 worktree_entry::Column::Id,
258 ])
259 .update_columns([
260 worktree_entry::Column::IsDir,
261 worktree_entry::Column::Path,
262 worktree_entry::Column::Inode,
263 worktree_entry::Column::MtimeSeconds,
264 worktree_entry::Column::MtimeNanos,
265 worktree_entry::Column::IsSymlink,
266 worktree_entry::Column::IsIgnored,
267 worktree_entry::Column::GitStatus,
268 worktree_entry::Column::ScanId,
269 ])
270 .to_owned(),
271 )
272 .exec(&*tx)
273 .await?;
274 }
275
276 if !update.removed_entries.is_empty() {
277 worktree_entry::Entity::update_many()
278 .filter(
279 worktree_entry::Column::ProjectId
280 .eq(project_id)
281 .and(worktree_entry::Column::WorktreeId.eq(worktree_id))
282 .and(
283 worktree_entry::Column::Id
284 .is_in(update.removed_entries.iter().map(|id| *id as i64)),
285 ),
286 )
287 .set(worktree_entry::ActiveModel {
288 is_deleted: ActiveValue::Set(true),
289 scan_id: ActiveValue::Set(update.scan_id as i64),
290 ..Default::default()
291 })
292 .exec(&*tx)
293 .await?;
294 }
295
296 if !update.updated_repositories.is_empty() {
297 worktree_repository::Entity::insert_many(update.updated_repositories.iter().map(
298 |repository| worktree_repository::ActiveModel {
299 project_id: ActiveValue::set(project_id),
300 worktree_id: ActiveValue::set(worktree_id),
301 work_directory_id: ActiveValue::set(repository.work_directory_id as i64),
302 scan_id: ActiveValue::set(update.scan_id as i64),
303 branch: ActiveValue::set(repository.branch.clone()),
304 is_deleted: ActiveValue::set(false),
305 },
306 ))
307 .on_conflict(
308 OnConflict::columns([
309 worktree_repository::Column::ProjectId,
310 worktree_repository::Column::WorktreeId,
311 worktree_repository::Column::WorkDirectoryId,
312 ])
313 .update_columns([
314 worktree_repository::Column::ScanId,
315 worktree_repository::Column::Branch,
316 ])
317 .to_owned(),
318 )
319 .exec(&*tx)
320 .await?;
321 }
322
323 if !update.removed_repositories.is_empty() {
324 worktree_repository::Entity::update_many()
325 .filter(
326 worktree_repository::Column::ProjectId
327 .eq(project_id)
328 .and(worktree_repository::Column::WorktreeId.eq(worktree_id))
329 .and(
330 worktree_repository::Column::WorkDirectoryId
331 .is_in(update.removed_repositories.iter().map(|id| *id as i64)),
332 ),
333 )
334 .set(worktree_repository::ActiveModel {
335 is_deleted: ActiveValue::Set(true),
336 scan_id: ActiveValue::Set(update.scan_id as i64),
337 ..Default::default()
338 })
339 .exec(&*tx)
340 .await?;
341 }
342
343 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
344 Ok(connection_ids)
345 })
346 .await
347 }
348
349 pub async fn update_diagnostic_summary(
350 &self,
351 update: &proto::UpdateDiagnosticSummary,
352 connection: ConnectionId,
353 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
354 let project_id = ProjectId::from_proto(update.project_id);
355 let worktree_id = update.worktree_id as i64;
356 let room_id = self.room_id_for_project(project_id).await?;
357 self.room_transaction(room_id, |tx| async move {
358 let summary = update
359 .summary
360 .as_ref()
361 .ok_or_else(|| anyhow!("invalid summary"))?;
362
363 // Ensure the update comes from the host.
364 let project = project::Entity::find_by_id(project_id)
365 .one(&*tx)
366 .await?
367 .ok_or_else(|| anyhow!("no such project"))?;
368 if project.host_connection()? != connection {
369 return Err(anyhow!("can't update a project hosted by someone else"))?;
370 }
371
372 // Update summary.
373 worktree_diagnostic_summary::Entity::insert(worktree_diagnostic_summary::ActiveModel {
374 project_id: ActiveValue::set(project_id),
375 worktree_id: ActiveValue::set(worktree_id),
376 path: ActiveValue::set(summary.path.clone()),
377 language_server_id: ActiveValue::set(summary.language_server_id as i64),
378 error_count: ActiveValue::set(summary.error_count as i32),
379 warning_count: ActiveValue::set(summary.warning_count as i32),
380 ..Default::default()
381 })
382 .on_conflict(
383 OnConflict::columns([
384 worktree_diagnostic_summary::Column::ProjectId,
385 worktree_diagnostic_summary::Column::WorktreeId,
386 worktree_diagnostic_summary::Column::Path,
387 ])
388 .update_columns([
389 worktree_diagnostic_summary::Column::LanguageServerId,
390 worktree_diagnostic_summary::Column::ErrorCount,
391 worktree_diagnostic_summary::Column::WarningCount,
392 ])
393 .to_owned(),
394 )
395 .exec(&*tx)
396 .await?;
397
398 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
399 Ok(connection_ids)
400 })
401 .await
402 }
403
404 pub async fn start_language_server(
405 &self,
406 update: &proto::StartLanguageServer,
407 connection: ConnectionId,
408 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
409 let project_id = ProjectId::from_proto(update.project_id);
410 let room_id = self.room_id_for_project(project_id).await?;
411 self.room_transaction(room_id, |tx| async move {
412 let server = update
413 .server
414 .as_ref()
415 .ok_or_else(|| anyhow!("invalid language server"))?;
416
417 // Ensure the update comes from the host.
418 let project = project::Entity::find_by_id(project_id)
419 .one(&*tx)
420 .await?
421 .ok_or_else(|| anyhow!("no such project"))?;
422 if project.host_connection()? != connection {
423 return Err(anyhow!("can't update a project hosted by someone else"))?;
424 }
425
426 // Add the newly-started language server.
427 language_server::Entity::insert(language_server::ActiveModel {
428 project_id: ActiveValue::set(project_id),
429 id: ActiveValue::set(server.id as i64),
430 name: ActiveValue::set(server.name.clone()),
431 ..Default::default()
432 })
433 .on_conflict(
434 OnConflict::columns([
435 language_server::Column::ProjectId,
436 language_server::Column::Id,
437 ])
438 .update_column(language_server::Column::Name)
439 .to_owned(),
440 )
441 .exec(&*tx)
442 .await?;
443
444 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
445 Ok(connection_ids)
446 })
447 .await
448 }
449
450 pub async fn update_worktree_settings(
451 &self,
452 update: &proto::UpdateWorktreeSettings,
453 connection: ConnectionId,
454 ) -> Result<RoomGuard<Vec<ConnectionId>>> {
455 let project_id = ProjectId::from_proto(update.project_id);
456 let room_id = self.room_id_for_project(project_id).await?;
457 self.room_transaction(room_id, |tx| async move {
458 // Ensure the update comes from the host.
459 let project = project::Entity::find_by_id(project_id)
460 .one(&*tx)
461 .await?
462 .ok_or_else(|| anyhow!("no such project"))?;
463 if project.host_connection()? != connection {
464 return Err(anyhow!("can't update a project hosted by someone else"))?;
465 }
466
467 if let Some(content) = &update.content {
468 worktree_settings_file::Entity::insert(worktree_settings_file::ActiveModel {
469 project_id: ActiveValue::Set(project_id),
470 worktree_id: ActiveValue::Set(update.worktree_id as i64),
471 path: ActiveValue::Set(update.path.clone()),
472 content: ActiveValue::Set(content.clone()),
473 })
474 .on_conflict(
475 OnConflict::columns([
476 worktree_settings_file::Column::ProjectId,
477 worktree_settings_file::Column::WorktreeId,
478 worktree_settings_file::Column::Path,
479 ])
480 .update_column(worktree_settings_file::Column::Content)
481 .to_owned(),
482 )
483 .exec(&*tx)
484 .await?;
485 } else {
486 worktree_settings_file::Entity::delete(worktree_settings_file::ActiveModel {
487 project_id: ActiveValue::Set(project_id),
488 worktree_id: ActiveValue::Set(update.worktree_id as i64),
489 path: ActiveValue::Set(update.path.clone()),
490 ..Default::default()
491 })
492 .exec(&*tx)
493 .await?;
494 }
495
496 let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
497 Ok(connection_ids)
498 })
499 .await
500 }
501
502 pub async fn join_project(
503 &self,
504 project_id: ProjectId,
505 connection: ConnectionId,
506 ) -> Result<RoomGuard<(Project, ReplicaId)>> {
507 let room_id = self.room_id_for_project(project_id).await?;
508 self.room_transaction(room_id, |tx| async move {
509 let participant = room_participant::Entity::find()
510 .filter(
511 Condition::all()
512 .add(
513 room_participant::Column::AnsweringConnectionId
514 .eq(connection.id as i32),
515 )
516 .add(
517 room_participant::Column::AnsweringConnectionServerId
518 .eq(connection.owner_id as i32),
519 ),
520 )
521 .one(&*tx)
522 .await?
523 .ok_or_else(|| anyhow!("must join a room first"))?;
524
525 let project = project::Entity::find_by_id(project_id)
526 .one(&*tx)
527 .await?
528 .ok_or_else(|| anyhow!("no such project"))?;
529 if project.room_id != participant.room_id {
530 return Err(anyhow!("no such project"))?;
531 }
532
533 let mut collaborators = project
534 .find_related(project_collaborator::Entity)
535 .all(&*tx)
536 .await?;
537 let replica_ids = collaborators
538 .iter()
539 .map(|c| c.replica_id)
540 .collect::<HashSet<_>>();
541 let mut replica_id = ReplicaId(1);
542 while replica_ids.contains(&replica_id) {
543 replica_id.0 += 1;
544 }
545 let new_collaborator = project_collaborator::ActiveModel {
546 project_id: ActiveValue::set(project_id),
547 connection_id: ActiveValue::set(connection.id as i32),
548 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
549 user_id: ActiveValue::set(participant.user_id),
550 replica_id: ActiveValue::set(replica_id),
551 is_host: ActiveValue::set(false),
552 ..Default::default()
553 }
554 .insert(&*tx)
555 .await?;
556 collaborators.push(new_collaborator);
557
558 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
559 let mut worktrees = db_worktrees
560 .into_iter()
561 .map(|db_worktree| {
562 (
563 db_worktree.id as u64,
564 Worktree {
565 id: db_worktree.id as u64,
566 abs_path: db_worktree.abs_path,
567 root_name: db_worktree.root_name,
568 visible: db_worktree.visible,
569 entries: Default::default(),
570 repository_entries: Default::default(),
571 diagnostic_summaries: Default::default(),
572 settings_files: Default::default(),
573 scan_id: db_worktree.scan_id as u64,
574 completed_scan_id: db_worktree.completed_scan_id as u64,
575 },
576 )
577 })
578 .collect::<BTreeMap<_, _>>();
579
580 // Populate worktree entries.
581 {
582 let mut db_entries = worktree_entry::Entity::find()
583 .filter(
584 Condition::all()
585 .add(worktree_entry::Column::ProjectId.eq(project_id))
586 .add(worktree_entry::Column::IsDeleted.eq(false)),
587 )
588 .stream(&*tx)
589 .await?;
590 while let Some(db_entry) = db_entries.next().await {
591 let db_entry = db_entry?;
592 if let Some(worktree) = worktrees.get_mut(&(db_entry.worktree_id as u64)) {
593 worktree.entries.push(proto::Entry {
594 id: db_entry.id as u64,
595 is_dir: db_entry.is_dir,
596 path: db_entry.path,
597 inode: db_entry.inode as u64,
598 mtime: Some(proto::Timestamp {
599 seconds: db_entry.mtime_seconds as u64,
600 nanos: db_entry.mtime_nanos as u32,
601 }),
602 is_symlink: db_entry.is_symlink,
603 is_ignored: db_entry.is_ignored,
604 is_external: db_entry.is_external,
605 git_status: db_entry.git_status.map(|status| status as i32),
606 });
607 }
608 }
609 }
610
611 // Populate repository entries.
612 {
613 let mut db_repository_entries = worktree_repository::Entity::find()
614 .filter(
615 Condition::all()
616 .add(worktree_repository::Column::ProjectId.eq(project_id))
617 .add(worktree_repository::Column::IsDeleted.eq(false)),
618 )
619 .stream(&*tx)
620 .await?;
621 while let Some(db_repository_entry) = db_repository_entries.next().await {
622 let db_repository_entry = db_repository_entry?;
623 if let Some(worktree) =
624 worktrees.get_mut(&(db_repository_entry.worktree_id as u64))
625 {
626 worktree.repository_entries.insert(
627 db_repository_entry.work_directory_id as u64,
628 proto::RepositoryEntry {
629 work_directory_id: db_repository_entry.work_directory_id as u64,
630 branch: db_repository_entry.branch,
631 },
632 );
633 }
634 }
635 }
636
637 // Populate worktree diagnostic summaries.
638 {
639 let mut db_summaries = worktree_diagnostic_summary::Entity::find()
640 .filter(worktree_diagnostic_summary::Column::ProjectId.eq(project_id))
641 .stream(&*tx)
642 .await?;
643 while let Some(db_summary) = db_summaries.next().await {
644 let db_summary = db_summary?;
645 if let Some(worktree) = worktrees.get_mut(&(db_summary.worktree_id as u64)) {
646 worktree
647 .diagnostic_summaries
648 .push(proto::DiagnosticSummary {
649 path: db_summary.path,
650 language_server_id: db_summary.language_server_id as u64,
651 error_count: db_summary.error_count as u32,
652 warning_count: db_summary.warning_count as u32,
653 });
654 }
655 }
656 }
657
658 // Populate worktree settings files
659 {
660 let mut db_settings_files = worktree_settings_file::Entity::find()
661 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
662 .stream(&*tx)
663 .await?;
664 while let Some(db_settings_file) = db_settings_files.next().await {
665 let db_settings_file = db_settings_file?;
666 if let Some(worktree) =
667 worktrees.get_mut(&(db_settings_file.worktree_id as u64))
668 {
669 worktree.settings_files.push(WorktreeSettingsFile {
670 path: db_settings_file.path,
671 content: db_settings_file.content,
672 });
673 }
674 }
675 }
676
677 // Populate language servers.
678 let language_servers = project
679 .find_related(language_server::Entity)
680 .all(&*tx)
681 .await?;
682
683 let project = Project {
684 collaborators: collaborators
685 .into_iter()
686 .map(|collaborator| ProjectCollaborator {
687 connection_id: collaborator.connection(),
688 user_id: collaborator.user_id,
689 replica_id: collaborator.replica_id,
690 is_host: collaborator.is_host,
691 })
692 .collect(),
693 worktrees,
694 language_servers: language_servers
695 .into_iter()
696 .map(|language_server| proto::LanguageServer {
697 id: language_server.id as u64,
698 name: language_server.name,
699 })
700 .collect(),
701 };
702 Ok((project, replica_id as ReplicaId))
703 })
704 .await
705 }
706
707 pub async fn leave_project(
708 &self,
709 project_id: ProjectId,
710 connection: ConnectionId,
711 ) -> Result<RoomGuard<(proto::Room, LeftProject)>> {
712 let room_id = self.room_id_for_project(project_id).await?;
713 self.room_transaction(room_id, |tx| async move {
714 let result = project_collaborator::Entity::delete_many()
715 .filter(
716 Condition::all()
717 .add(project_collaborator::Column::ProjectId.eq(project_id))
718 .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32))
719 .add(
720 project_collaborator::Column::ConnectionServerId
721 .eq(connection.owner_id as i32),
722 ),
723 )
724 .exec(&*tx)
725 .await?;
726 if result.rows_affected == 0 {
727 Err(anyhow!("not a collaborator on this project"))?;
728 }
729
730 let project = project::Entity::find_by_id(project_id)
731 .one(&*tx)
732 .await?
733 .ok_or_else(|| anyhow!("no such project"))?;
734 let collaborators = project
735 .find_related(project_collaborator::Entity)
736 .all(&*tx)
737 .await?;
738 let connection_ids = collaborators
739 .into_iter()
740 .map(|collaborator| collaborator.connection())
741 .collect();
742
743 follower::Entity::delete_many()
744 .filter(
745 Condition::any()
746 .add(
747 Condition::all()
748 .add(follower::Column::ProjectId.eq(Some(project_id)))
749 .add(
750 follower::Column::LeaderConnectionServerId
751 .eq(connection.owner_id),
752 )
753 .add(follower::Column::LeaderConnectionId.eq(connection.id)),
754 )
755 .add(
756 Condition::all()
757 .add(follower::Column::ProjectId.eq(Some(project_id)))
758 .add(
759 follower::Column::FollowerConnectionServerId
760 .eq(connection.owner_id),
761 )
762 .add(follower::Column::FollowerConnectionId.eq(connection.id)),
763 ),
764 )
765 .exec(&*tx)
766 .await?;
767
768 let room = self.get_room(project.room_id, &tx).await?;
769 let left_project = LeftProject {
770 id: project_id,
771 host_user_id: project.host_user_id,
772 host_connection_id: project.host_connection()?,
773 connection_ids,
774 };
775 Ok((room, left_project))
776 })
777 .await
778 }
779
780 pub async fn check_user_is_project_host(
781 &self,
782 project_id: ProjectId,
783 connection_id: ConnectionId,
784 ) -> Result<()> {
785 let room_id = self.room_id_for_project(project_id).await?;
786 self.room_transaction(room_id, |tx| async move {
787 project_collaborator::Entity::find()
788 .filter(
789 Condition::all()
790 .add(project_collaborator::Column::ProjectId.eq(project_id))
791 .add(project_collaborator::Column::IsHost.eq(true))
792 .add(project_collaborator::Column::ConnectionId.eq(connection_id.id))
793 .add(
794 project_collaborator::Column::ConnectionServerId
795 .eq(connection_id.owner_id),
796 ),
797 )
798 .one(&*tx)
799 .await?
800 .ok_or_else(|| anyhow!("failed to read project host"))?;
801
802 Ok(())
803 })
804 .await
805 .map(|guard| guard.into_inner())
806 }
807
808 pub async fn host_for_read_only_project_request(
809 &self,
810 project_id: ProjectId,
811 connection_id: ConnectionId,
812 ) -> Result<ConnectionId> {
813 let room_id = self.room_id_for_project(project_id).await?;
814 self.room_transaction(room_id, |tx| async move {
815 let current_participant = room_participant::Entity::find()
816 .filter(room_participant::Column::RoomId.eq(room_id))
817 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
818 .one(&*tx)
819 .await?
820 .ok_or_else(|| anyhow!("no such room"))?;
821
822 if !current_participant
823 .role
824 .map_or(false, |role| role.can_read_projects())
825 {
826 Err(anyhow!("not authorized to read projects"))?;
827 }
828
829 let host = project_collaborator::Entity::find()
830 .filter(
831 project_collaborator::Column::ProjectId
832 .eq(project_id)
833 .and(project_collaborator::Column::IsHost.eq(true)),
834 )
835 .one(&*tx)
836 .await?
837 .ok_or_else(|| anyhow!("failed to read project host"))?;
838
839 Ok(host.connection())
840 })
841 .await
842 .map(|guard| guard.into_inner())
843 }
844
845 pub async fn host_for_mutating_project_request(
846 &self,
847 project_id: ProjectId,
848 connection_id: ConnectionId,
849 ) -> Result<ConnectionId> {
850 let room_id = self.room_id_for_project(project_id).await?;
851 self.room_transaction(room_id, |tx| async move {
852 let current_participant = room_participant::Entity::find()
853 .filter(room_participant::Column::RoomId.eq(room_id))
854 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
855 .one(&*tx)
856 .await?
857 .ok_or_else(|| anyhow!("no such room"))?;
858
859 if !current_participant
860 .role
861 .map_or(false, |role| role.can_edit_projects())
862 {
863 Err(anyhow!("not authorized to edit projects"))?;
864 }
865
866 let host = project_collaborator::Entity::find()
867 .filter(
868 project_collaborator::Column::ProjectId
869 .eq(project_id)
870 .and(project_collaborator::Column::IsHost.eq(true)),
871 )
872 .one(&*tx)
873 .await?
874 .ok_or_else(|| anyhow!("failed to read project host"))?;
875
876 Ok(host.connection())
877 })
878 .await
879 .map(|guard| guard.into_inner())
880 }
881
882 pub async fn project_collaborators_for_buffer_update(
883 &self,
884 project_id: ProjectId,
885 connection_id: ConnectionId,
886 ) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
887 let room_id = self.room_id_for_project(project_id).await?;
888 self.room_transaction(room_id, |tx| async move {
889 let current_participant = room_participant::Entity::find()
890 .filter(room_participant::Column::RoomId.eq(room_id))
891 .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.id))
892 .one(&*tx)
893 .await?
894 .ok_or_else(|| anyhow!("no such room"))?;
895
896 if !current_participant
897 .role
898 .map_or(false, |role| role.can_edit_projects())
899 {
900 Err(anyhow!("not authorized to edit projects"))?;
901 }
902
903 let collaborators = project_collaborator::Entity::find()
904 .filter(project_collaborator::Column::ProjectId.eq(project_id))
905 .all(&*tx)
906 .await?
907 .into_iter()
908 .map(|collaborator| ProjectCollaborator {
909 connection_id: collaborator.connection(),
910 user_id: collaborator.user_id,
911 replica_id: collaborator.replica_id,
912 is_host: collaborator.is_host,
913 })
914 .collect::<Vec<_>>();
915
916 if collaborators
917 .iter()
918 .any(|collaborator| collaborator.connection_id == connection_id)
919 {
920 Ok(collaborators)
921 } else {
922 Err(anyhow!("no such project"))?
923 }
924 })
925 .await
926 }
927
928 pub async fn project_connection_ids(
929 &self,
930 project_id: ProjectId,
931 connection_id: ConnectionId,
932 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
933 let room_id = self.room_id_for_project(project_id).await?;
934 self.room_transaction(room_id, |tx| async move {
935 let mut collaborators = project_collaborator::Entity::find()
936 .filter(project_collaborator::Column::ProjectId.eq(project_id))
937 .stream(&*tx)
938 .await?;
939
940 let mut connection_ids = HashSet::default();
941 while let Some(collaborator) = collaborators.next().await {
942 let collaborator = collaborator?;
943 connection_ids.insert(collaborator.connection());
944 }
945
946 if connection_ids.contains(&connection_id) {
947 Ok(connection_ids)
948 } else {
949 Err(anyhow!("no such project"))?
950 }
951 })
952 .await
953 }
954
955 async fn project_guest_connection_ids(
956 &self,
957 project_id: ProjectId,
958 tx: &DatabaseTransaction,
959 ) -> Result<Vec<ConnectionId>> {
960 let mut collaborators = project_collaborator::Entity::find()
961 .filter(
962 project_collaborator::Column::ProjectId
963 .eq(project_id)
964 .and(project_collaborator::Column::IsHost.eq(false)),
965 )
966 .stream(tx)
967 .await?;
968
969 let mut guest_connection_ids = Vec::new();
970 while let Some(collaborator) = collaborators.next().await {
971 let collaborator = collaborator?;
972 guest_connection_ids.push(collaborator.connection());
973 }
974 Ok(guest_connection_ids)
975 }
976
977 pub async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
978 self.transaction(|tx| async move {
979 let project = project::Entity::find_by_id(project_id)
980 .one(&*tx)
981 .await?
982 .ok_or_else(|| anyhow!("project {} not found", project_id))?;
983 Ok(project.room_id)
984 })
985 .await
986 }
987
988 pub async fn check_room_participants(
989 &self,
990 room_id: RoomId,
991 leader_id: ConnectionId,
992 follower_id: ConnectionId,
993 ) -> Result<()> {
994 self.transaction(|tx| async move {
995 use room_participant::Column;
996
997 let count = room_participant::Entity::find()
998 .filter(
999 Condition::all().add(Column::RoomId.eq(room_id)).add(
1000 Condition::any()
1001 .add(Column::AnsweringConnectionId.eq(leader_id.id as i32).and(
1002 Column::AnsweringConnectionServerId.eq(leader_id.owner_id as i32),
1003 ))
1004 .add(Column::AnsweringConnectionId.eq(follower_id.id as i32).and(
1005 Column::AnsweringConnectionServerId.eq(follower_id.owner_id as i32),
1006 )),
1007 ),
1008 )
1009 .count(&*tx)
1010 .await?;
1011
1012 if count < 2 {
1013 Err(anyhow!("not room participants"))?;
1014 }
1015
1016 Ok(())
1017 })
1018 .await
1019 }
1020
1021 pub async fn follow(
1022 &self,
1023 room_id: RoomId,
1024 project_id: ProjectId,
1025 leader_connection: ConnectionId,
1026 follower_connection: ConnectionId,
1027 ) -> Result<RoomGuard<proto::Room>> {
1028 self.room_transaction(room_id, |tx| async move {
1029 follower::ActiveModel {
1030 room_id: ActiveValue::set(room_id),
1031 project_id: ActiveValue::set(project_id),
1032 leader_connection_server_id: ActiveValue::set(ServerId(
1033 leader_connection.owner_id as i32,
1034 )),
1035 leader_connection_id: ActiveValue::set(leader_connection.id as i32),
1036 follower_connection_server_id: ActiveValue::set(ServerId(
1037 follower_connection.owner_id as i32,
1038 )),
1039 follower_connection_id: ActiveValue::set(follower_connection.id as i32),
1040 ..Default::default()
1041 }
1042 .insert(&*tx)
1043 .await?;
1044
1045 let room = self.get_room(room_id, &*tx).await?;
1046 Ok(room)
1047 })
1048 .await
1049 }
1050
1051 pub async fn unfollow(
1052 &self,
1053 room_id: RoomId,
1054 project_id: ProjectId,
1055 leader_connection: ConnectionId,
1056 follower_connection: ConnectionId,
1057 ) -> Result<RoomGuard<proto::Room>> {
1058 self.room_transaction(room_id, |tx| async move {
1059 follower::Entity::delete_many()
1060 .filter(
1061 Condition::all()
1062 .add(follower::Column::RoomId.eq(room_id))
1063 .add(follower::Column::ProjectId.eq(project_id))
1064 .add(
1065 follower::Column::LeaderConnectionServerId
1066 .eq(leader_connection.owner_id),
1067 )
1068 .add(follower::Column::LeaderConnectionId.eq(leader_connection.id))
1069 .add(
1070 follower::Column::FollowerConnectionServerId
1071 .eq(follower_connection.owner_id),
1072 )
1073 .add(follower::Column::FollowerConnectionId.eq(follower_connection.id)),
1074 )
1075 .exec(&*tx)
1076 .await?;
1077
1078 let room = self.get_room(room_id, &*tx).await?;
1079 Ok(room)
1080 })
1081 .await
1082 }
1083}