1use super::*;
2
3impl Database {
4 pub async fn clear_stale_room_participants(
5 &self,
6 room_id: RoomId,
7 new_server_id: ServerId,
8 ) -> Result<RoomGuard<RefreshedRoom>> {
9 self.room_transaction(room_id, |tx| async move {
10 let stale_participant_filter = Condition::all()
11 .add(room_participant::Column::RoomId.eq(room_id))
12 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
13 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
14
15 let stale_participant_user_ids = room_participant::Entity::find()
16 .filter(stale_participant_filter.clone())
17 .all(&*tx)
18 .await?
19 .into_iter()
20 .map(|participant| participant.user_id)
21 .collect::<Vec<_>>();
22
23 // Delete participants who failed to reconnect and cancel their calls.
24 let mut canceled_calls_to_user_ids = Vec::new();
25 room_participant::Entity::delete_many()
26 .filter(stale_participant_filter)
27 .exec(&*tx)
28 .await?;
29 let called_participants = room_participant::Entity::find()
30 .filter(
31 Condition::all()
32 .add(
33 room_participant::Column::CallingUserId
34 .is_in(stale_participant_user_ids.iter().copied()),
35 )
36 .add(room_participant::Column::AnsweringConnectionId.is_null()),
37 )
38 .all(&*tx)
39 .await?;
40 room_participant::Entity::delete_many()
41 .filter(
42 room_participant::Column::Id
43 .is_in(called_participants.iter().map(|participant| participant.id)),
44 )
45 .exec(&*tx)
46 .await?;
47 canceled_calls_to_user_ids.extend(
48 called_participants
49 .into_iter()
50 .map(|participant| participant.user_id),
51 );
52
53 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
54 let channel_members;
55 if let Some(channel_id) = channel_id {
56 channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
57 } else {
58 channel_members = Vec::new();
59
60 // Delete the room if it becomes empty.
61 if room.participants.is_empty() {
62 project::Entity::delete_many()
63 .filter(project::Column::RoomId.eq(room_id))
64 .exec(&*tx)
65 .await?;
66 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
67 }
68 };
69
70 Ok(RefreshedRoom {
71 room,
72 channel_id,
73 channel_members,
74 stale_participant_user_ids,
75 canceled_calls_to_user_ids,
76 })
77 })
78 .await
79 }
80
81 pub async fn incoming_call_for_user(
82 &self,
83 user_id: UserId,
84 ) -> Result<Option<proto::IncomingCall>> {
85 self.transaction(|tx| async move {
86 let pending_participant = room_participant::Entity::find()
87 .filter(
88 room_participant::Column::UserId
89 .eq(user_id)
90 .and(room_participant::Column::AnsweringConnectionId.is_null()),
91 )
92 .one(&*tx)
93 .await?;
94
95 if let Some(pending_participant) = pending_participant {
96 let room = self.get_room(pending_participant.room_id, &tx).await?;
97 Ok(Self::build_incoming_call(&room, user_id))
98 } else {
99 Ok(None)
100 }
101 })
102 .await
103 }
104
105 pub async fn create_room(
106 &self,
107 user_id: UserId,
108 connection: ConnectionId,
109 live_kit_room: &str,
110 ) -> Result<proto::Room> {
111 self.transaction(|tx| async move {
112 let room = room::ActiveModel {
113 live_kit_room: ActiveValue::set(live_kit_room.into()),
114 ..Default::default()
115 }
116 .insert(&*tx)
117 .await?;
118 room_participant::ActiveModel {
119 room_id: ActiveValue::set(room.id),
120 user_id: ActiveValue::set(user_id),
121 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
122 answering_connection_server_id: ActiveValue::set(Some(ServerId(
123 connection.owner_id as i32,
124 ))),
125 answering_connection_lost: ActiveValue::set(false),
126 calling_user_id: ActiveValue::set(user_id),
127 calling_connection_id: ActiveValue::set(connection.id as i32),
128 calling_connection_server_id: ActiveValue::set(Some(ServerId(
129 connection.owner_id as i32,
130 ))),
131 participant_index: ActiveValue::set(Some(0)),
132 ..Default::default()
133 }
134 .insert(&*tx)
135 .await?;
136
137 let room = self.get_room(room.id, &tx).await?;
138 Ok(room)
139 })
140 .await
141 }
142
143 pub async fn call(
144 &self,
145 room_id: RoomId,
146 calling_user_id: UserId,
147 calling_connection: ConnectionId,
148 called_user_id: UserId,
149 initial_project_id: Option<ProjectId>,
150 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
151 self.room_transaction(room_id, |tx| async move {
152 room_participant::ActiveModel {
153 room_id: ActiveValue::set(room_id),
154 user_id: ActiveValue::set(called_user_id),
155 answering_connection_lost: ActiveValue::set(false),
156 participant_index: ActiveValue::NotSet,
157 calling_user_id: ActiveValue::set(calling_user_id),
158 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
159 calling_connection_server_id: ActiveValue::set(Some(ServerId(
160 calling_connection.owner_id as i32,
161 ))),
162 initial_project_id: ActiveValue::set(initial_project_id),
163 ..Default::default()
164 }
165 .insert(&*tx)
166 .await?;
167
168 let room = self.get_room(room_id, &tx).await?;
169 let incoming_call = Self::build_incoming_call(&room, called_user_id)
170 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
171 Ok((room, incoming_call))
172 })
173 .await
174 }
175
176 pub async fn call_failed(
177 &self,
178 room_id: RoomId,
179 called_user_id: UserId,
180 ) -> Result<RoomGuard<proto::Room>> {
181 self.room_transaction(room_id, |tx| async move {
182 room_participant::Entity::delete_many()
183 .filter(
184 room_participant::Column::RoomId
185 .eq(room_id)
186 .and(room_participant::Column::UserId.eq(called_user_id)),
187 )
188 .exec(&*tx)
189 .await?;
190 let room = self.get_room(room_id, &tx).await?;
191 Ok(room)
192 })
193 .await
194 }
195
196 pub async fn decline_call(
197 &self,
198 expected_room_id: Option<RoomId>,
199 user_id: UserId,
200 ) -> Result<Option<RoomGuard<proto::Room>>> {
201 self.optional_room_transaction(|tx| async move {
202 let mut filter = Condition::all()
203 .add(room_participant::Column::UserId.eq(user_id))
204 .add(room_participant::Column::AnsweringConnectionId.is_null());
205 if let Some(room_id) = expected_room_id {
206 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
207 }
208 let participant = room_participant::Entity::find()
209 .filter(filter)
210 .one(&*tx)
211 .await?;
212
213 let participant = if let Some(participant) = participant {
214 participant
215 } else if expected_room_id.is_some() {
216 return Err(anyhow!("could not find call to decline"))?;
217 } else {
218 return Ok(None);
219 };
220
221 let room_id = participant.room_id;
222 room_participant::Entity::delete(participant.into_active_model())
223 .exec(&*tx)
224 .await?;
225
226 let room = self.get_room(room_id, &tx).await?;
227 Ok(Some((room_id, room)))
228 })
229 .await
230 }
231
232 pub async fn cancel_call(
233 &self,
234 room_id: RoomId,
235 calling_connection: ConnectionId,
236 called_user_id: UserId,
237 ) -> Result<RoomGuard<proto::Room>> {
238 self.room_transaction(room_id, |tx| async move {
239 let participant = room_participant::Entity::find()
240 .filter(
241 Condition::all()
242 .add(room_participant::Column::UserId.eq(called_user_id))
243 .add(room_participant::Column::RoomId.eq(room_id))
244 .add(
245 room_participant::Column::CallingConnectionId
246 .eq(calling_connection.id as i32),
247 )
248 .add(
249 room_participant::Column::CallingConnectionServerId
250 .eq(calling_connection.owner_id as i32),
251 )
252 .add(room_participant::Column::AnsweringConnectionId.is_null()),
253 )
254 .one(&*tx)
255 .await?
256 .ok_or_else(|| anyhow!("no call to cancel"))?;
257
258 room_participant::Entity::delete(participant.into_active_model())
259 .exec(&*tx)
260 .await?;
261
262 let room = self.get_room(room_id, &tx).await?;
263 Ok(room)
264 })
265 .await
266 }
267
268 pub async fn join_room(
269 &self,
270 room_id: RoomId,
271 user_id: UserId,
272 connection: ConnectionId,
273 ) -> Result<RoomGuard<JoinRoom>> {
274 self.room_transaction(room_id, |tx| async move {
275 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
276 enum QueryChannelId {
277 ChannelId,
278 }
279 let channel_id: Option<ChannelId> = room::Entity::find()
280 .select_only()
281 .column(room::Column::ChannelId)
282 .filter(room::Column::Id.eq(room_id))
283 .into_values::<_, QueryChannelId>()
284 .one(&*tx)
285 .await?
286 .ok_or_else(|| anyhow!("no such room"))?;
287
288 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
289 enum QueryParticipantIndices {
290 ParticipantIndex,
291 }
292 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
293 .filter(
294 room_participant::Column::RoomId
295 .eq(room_id)
296 .and(room_participant::Column::ParticipantIndex.is_not_null()),
297 )
298 .select_only()
299 .column(room_participant::Column::ParticipantIndex)
300 .into_values::<_, QueryParticipantIndices>()
301 .all(&*tx)
302 .await?;
303 let mut participant_index = 0;
304 while existing_participant_indices.contains(&participant_index) {
305 participant_index += 1;
306 }
307
308 if let Some(channel_id) = channel_id {
309 self.check_user_is_channel_member(channel_id, user_id, &*tx)
310 .await?;
311
312 room_participant::Entity::insert_many([room_participant::ActiveModel {
313 room_id: ActiveValue::set(room_id),
314 user_id: ActiveValue::set(user_id),
315 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
316 answering_connection_server_id: ActiveValue::set(Some(ServerId(
317 connection.owner_id as i32,
318 ))),
319 answering_connection_lost: ActiveValue::set(false),
320 calling_user_id: ActiveValue::set(user_id),
321 calling_connection_id: ActiveValue::set(connection.id as i32),
322 calling_connection_server_id: ActiveValue::set(Some(ServerId(
323 connection.owner_id as i32,
324 ))),
325 participant_index: ActiveValue::Set(Some(participant_index)),
326 ..Default::default()
327 }])
328 .on_conflict(
329 OnConflict::columns([room_participant::Column::UserId])
330 .update_columns([
331 room_participant::Column::AnsweringConnectionId,
332 room_participant::Column::AnsweringConnectionServerId,
333 room_participant::Column::AnsweringConnectionLost,
334 room_participant::Column::ParticipantIndex,
335 ])
336 .to_owned(),
337 )
338 .exec(&*tx)
339 .await?;
340 } else {
341 let result = room_participant::Entity::update_many()
342 .filter(
343 Condition::all()
344 .add(room_participant::Column::RoomId.eq(room_id))
345 .add(room_participant::Column::UserId.eq(user_id))
346 .add(room_participant::Column::AnsweringConnectionId.is_null()),
347 )
348 .set(room_participant::ActiveModel {
349 participant_index: ActiveValue::Set(Some(participant_index)),
350 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
351 answering_connection_server_id: ActiveValue::set(Some(ServerId(
352 connection.owner_id as i32,
353 ))),
354 answering_connection_lost: ActiveValue::set(false),
355 ..Default::default()
356 })
357 .exec(&*tx)
358 .await?;
359 if result.rows_affected == 0 {
360 Err(anyhow!("room does not exist or was already joined"))?;
361 }
362 }
363
364 let room = self.get_room(room_id, &tx).await?;
365 let channel_members = if let Some(channel_id) = channel_id {
366 self.get_channel_members_internal(channel_id, &tx).await?
367 } else {
368 Vec::new()
369 };
370 Ok(JoinRoom {
371 room,
372 channel_id,
373 channel_members,
374 })
375 })
376 .await
377 }
378
379 pub async fn rejoin_room(
380 &self,
381 rejoin_room: proto::RejoinRoom,
382 user_id: UserId,
383 connection: ConnectionId,
384 ) -> Result<RoomGuard<RejoinedRoom>> {
385 let room_id = RoomId::from_proto(rejoin_room.id);
386 self.room_transaction(room_id, |tx| async {
387 let tx = tx;
388 let participant_update = room_participant::Entity::update_many()
389 .filter(
390 Condition::all()
391 .add(room_participant::Column::RoomId.eq(room_id))
392 .add(room_participant::Column::UserId.eq(user_id))
393 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
394 .add(
395 Condition::any()
396 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
397 .add(
398 room_participant::Column::AnsweringConnectionServerId
399 .ne(connection.owner_id as i32),
400 ),
401 ),
402 )
403 .set(room_participant::ActiveModel {
404 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
405 answering_connection_server_id: ActiveValue::set(Some(ServerId(
406 connection.owner_id as i32,
407 ))),
408 answering_connection_lost: ActiveValue::set(false),
409 ..Default::default()
410 })
411 .exec(&*tx)
412 .await?;
413 if participant_update.rows_affected == 0 {
414 return Err(anyhow!("room does not exist or was already joined"))?;
415 }
416
417 let mut reshared_projects = Vec::new();
418 for reshared_project in &rejoin_room.reshared_projects {
419 let project_id = ProjectId::from_proto(reshared_project.project_id);
420 let project = project::Entity::find_by_id(project_id)
421 .one(&*tx)
422 .await?
423 .ok_or_else(|| anyhow!("project does not exist"))?;
424 if project.host_user_id != user_id {
425 return Err(anyhow!("no such project"))?;
426 }
427
428 let mut collaborators = project
429 .find_related(project_collaborator::Entity)
430 .all(&*tx)
431 .await?;
432 let host_ix = collaborators
433 .iter()
434 .position(|collaborator| {
435 collaborator.user_id == user_id && collaborator.is_host
436 })
437 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
438 let host = collaborators.swap_remove(host_ix);
439 let old_connection_id = host.connection();
440
441 project::Entity::update(project::ActiveModel {
442 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
443 host_connection_server_id: ActiveValue::set(Some(ServerId(
444 connection.owner_id as i32,
445 ))),
446 ..project.into_active_model()
447 })
448 .exec(&*tx)
449 .await?;
450 project_collaborator::Entity::update(project_collaborator::ActiveModel {
451 connection_id: ActiveValue::set(connection.id as i32),
452 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
453 ..host.into_active_model()
454 })
455 .exec(&*tx)
456 .await?;
457
458 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
459 .await?;
460
461 reshared_projects.push(ResharedProject {
462 id: project_id,
463 old_connection_id,
464 collaborators: collaborators
465 .iter()
466 .map(|collaborator| ProjectCollaborator {
467 connection_id: collaborator.connection(),
468 user_id: collaborator.user_id,
469 replica_id: collaborator.replica_id,
470 is_host: collaborator.is_host,
471 })
472 .collect(),
473 worktrees: reshared_project.worktrees.clone(),
474 });
475 }
476
477 project::Entity::delete_many()
478 .filter(
479 Condition::all()
480 .add(project::Column::RoomId.eq(room_id))
481 .add(project::Column::HostUserId.eq(user_id))
482 .add(
483 project::Column::Id
484 .is_not_in(reshared_projects.iter().map(|project| project.id)),
485 ),
486 )
487 .exec(&*tx)
488 .await?;
489
490 let mut rejoined_projects = Vec::new();
491 for rejoined_project in &rejoin_room.rejoined_projects {
492 let project_id = ProjectId::from_proto(rejoined_project.id);
493 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
494 continue;
495 };
496
497 let mut worktrees = Vec::new();
498 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
499 for db_worktree in db_worktrees {
500 let mut worktree = RejoinedWorktree {
501 id: db_worktree.id as u64,
502 abs_path: db_worktree.abs_path,
503 root_name: db_worktree.root_name,
504 visible: db_worktree.visible,
505 updated_entries: Default::default(),
506 removed_entries: Default::default(),
507 updated_repositories: Default::default(),
508 removed_repositories: Default::default(),
509 diagnostic_summaries: Default::default(),
510 settings_files: Default::default(),
511 scan_id: db_worktree.scan_id as u64,
512 completed_scan_id: db_worktree.completed_scan_id as u64,
513 };
514
515 let rejoined_worktree = rejoined_project
516 .worktrees
517 .iter()
518 .find(|worktree| worktree.id == db_worktree.id as u64);
519
520 // File entries
521 {
522 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
523 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
524 } else {
525 worktree_entry::Column::IsDeleted.eq(false)
526 };
527
528 let mut db_entries = worktree_entry::Entity::find()
529 .filter(
530 Condition::all()
531 .add(worktree_entry::Column::ProjectId.eq(project.id))
532 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
533 .add(entry_filter),
534 )
535 .stream(&*tx)
536 .await?;
537
538 while let Some(db_entry) = db_entries.next().await {
539 let db_entry = db_entry?;
540 if db_entry.is_deleted {
541 worktree.removed_entries.push(db_entry.id as u64);
542 } else {
543 worktree.updated_entries.push(proto::Entry {
544 id: db_entry.id as u64,
545 is_dir: db_entry.is_dir,
546 path: db_entry.path,
547 inode: db_entry.inode as u64,
548 mtime: Some(proto::Timestamp {
549 seconds: db_entry.mtime_seconds as u64,
550 nanos: db_entry.mtime_nanos as u32,
551 }),
552 is_symlink: db_entry.is_symlink,
553 is_ignored: db_entry.is_ignored,
554 is_external: db_entry.is_external,
555 git_status: db_entry.git_status.map(|status| status as i32),
556 });
557 }
558 }
559 }
560
561 // Repository Entries
562 {
563 let repository_entry_filter =
564 if let Some(rejoined_worktree) = rejoined_worktree {
565 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
566 } else {
567 worktree_repository::Column::IsDeleted.eq(false)
568 };
569
570 let mut db_repositories = worktree_repository::Entity::find()
571 .filter(
572 Condition::all()
573 .add(worktree_repository::Column::ProjectId.eq(project.id))
574 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
575 .add(repository_entry_filter),
576 )
577 .stream(&*tx)
578 .await?;
579
580 while let Some(db_repository) = db_repositories.next().await {
581 let db_repository = db_repository?;
582 if db_repository.is_deleted {
583 worktree
584 .removed_repositories
585 .push(db_repository.work_directory_id as u64);
586 } else {
587 worktree.updated_repositories.push(proto::RepositoryEntry {
588 work_directory_id: db_repository.work_directory_id as u64,
589 branch: db_repository.branch,
590 });
591 }
592 }
593 }
594
595 worktrees.push(worktree);
596 }
597
598 let language_servers = project
599 .find_related(language_server::Entity)
600 .all(&*tx)
601 .await?
602 .into_iter()
603 .map(|language_server| proto::LanguageServer {
604 id: language_server.id as u64,
605 name: language_server.name,
606 })
607 .collect::<Vec<_>>();
608
609 {
610 let mut db_settings_files = worktree_settings_file::Entity::find()
611 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
612 .stream(&*tx)
613 .await?;
614 while let Some(db_settings_file) = db_settings_files.next().await {
615 let db_settings_file = db_settings_file?;
616 if let Some(worktree) = worktrees
617 .iter_mut()
618 .find(|w| w.id == db_settings_file.worktree_id as u64)
619 {
620 worktree.settings_files.push(WorktreeSettingsFile {
621 path: db_settings_file.path,
622 content: db_settings_file.content,
623 });
624 }
625 }
626 }
627
628 let mut collaborators = project
629 .find_related(project_collaborator::Entity)
630 .all(&*tx)
631 .await?;
632 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
633 .iter()
634 .position(|collaborator| collaborator.user_id == user_id)
635 {
636 collaborators.swap_remove(self_collaborator_ix)
637 } else {
638 continue;
639 };
640 let old_connection_id = self_collaborator.connection();
641 project_collaborator::Entity::update(project_collaborator::ActiveModel {
642 connection_id: ActiveValue::set(connection.id as i32),
643 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
644 ..self_collaborator.into_active_model()
645 })
646 .exec(&*tx)
647 .await?;
648
649 let collaborators = collaborators
650 .into_iter()
651 .map(|collaborator| ProjectCollaborator {
652 connection_id: collaborator.connection(),
653 user_id: collaborator.user_id,
654 replica_id: collaborator.replica_id,
655 is_host: collaborator.is_host,
656 })
657 .collect::<Vec<_>>();
658
659 rejoined_projects.push(RejoinedProject {
660 id: project_id,
661 old_connection_id,
662 collaborators,
663 worktrees,
664 language_servers,
665 });
666 }
667
668 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
669 let channel_members = if let Some(channel_id) = channel_id {
670 self.get_channel_members_internal(channel_id, &tx).await?
671 } else {
672 Vec::new()
673 };
674
675 Ok(RejoinedRoom {
676 room,
677 channel_id,
678 channel_members,
679 rejoined_projects,
680 reshared_projects,
681 })
682 })
683 .await
684 }
685
686 pub async fn leave_room(
687 &self,
688 connection: ConnectionId,
689 ) -> Result<Option<RoomGuard<LeftRoom>>> {
690 self.optional_room_transaction(|tx| async move {
691 let leaving_participant = room_participant::Entity::find()
692 .filter(
693 Condition::all()
694 .add(
695 room_participant::Column::AnsweringConnectionId
696 .eq(connection.id as i32),
697 )
698 .add(
699 room_participant::Column::AnsweringConnectionServerId
700 .eq(connection.owner_id as i32),
701 ),
702 )
703 .one(&*tx)
704 .await?;
705
706 if let Some(leaving_participant) = leaving_participant {
707 // Leave room.
708 let room_id = leaving_participant.room_id;
709 room_participant::Entity::delete_by_id(leaving_participant.id)
710 .exec(&*tx)
711 .await?;
712
713 // Cancel pending calls initiated by the leaving user.
714 let called_participants = room_participant::Entity::find()
715 .filter(
716 Condition::all()
717 .add(
718 room_participant::Column::CallingUserId
719 .eq(leaving_participant.user_id),
720 )
721 .add(room_participant::Column::AnsweringConnectionId.is_null()),
722 )
723 .all(&*tx)
724 .await?;
725 room_participant::Entity::delete_many()
726 .filter(
727 room_participant::Column::Id
728 .is_in(called_participants.iter().map(|participant| participant.id)),
729 )
730 .exec(&*tx)
731 .await?;
732 let canceled_calls_to_user_ids = called_participants
733 .into_iter()
734 .map(|participant| participant.user_id)
735 .collect();
736
737 // Detect left projects.
738 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
739 enum QueryProjectIds {
740 ProjectId,
741 }
742 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
743 .select_only()
744 .column_as(
745 project_collaborator::Column::ProjectId,
746 QueryProjectIds::ProjectId,
747 )
748 .filter(
749 Condition::all()
750 .add(
751 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
752 )
753 .add(
754 project_collaborator::Column::ConnectionServerId
755 .eq(connection.owner_id as i32),
756 ),
757 )
758 .into_values::<_, QueryProjectIds>()
759 .all(&*tx)
760 .await?;
761 let mut left_projects = HashMap::default();
762 let mut collaborators = project_collaborator::Entity::find()
763 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
764 .stream(&*tx)
765 .await?;
766 while let Some(collaborator) = collaborators.next().await {
767 let collaborator = collaborator?;
768 let left_project =
769 left_projects
770 .entry(collaborator.project_id)
771 .or_insert(LeftProject {
772 id: collaborator.project_id,
773 host_user_id: Default::default(),
774 connection_ids: Default::default(),
775 host_connection_id: Default::default(),
776 });
777
778 let collaborator_connection_id = collaborator.connection();
779 if collaborator_connection_id != connection {
780 left_project.connection_ids.push(collaborator_connection_id);
781 }
782
783 if collaborator.is_host {
784 left_project.host_user_id = collaborator.user_id;
785 left_project.host_connection_id = collaborator_connection_id;
786 }
787 }
788 drop(collaborators);
789
790 // Leave projects.
791 project_collaborator::Entity::delete_many()
792 .filter(
793 Condition::all()
794 .add(
795 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
796 )
797 .add(
798 project_collaborator::Column::ConnectionServerId
799 .eq(connection.owner_id as i32),
800 ),
801 )
802 .exec(&*tx)
803 .await?;
804
805 // Unshare projects.
806 project::Entity::delete_many()
807 .filter(
808 Condition::all()
809 .add(project::Column::RoomId.eq(room_id))
810 .add(project::Column::HostConnectionId.eq(connection.id as i32))
811 .add(
812 project::Column::HostConnectionServerId
813 .eq(connection.owner_id as i32),
814 ),
815 )
816 .exec(&*tx)
817 .await?;
818
819 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
820 let deleted = if room.participants.is_empty() {
821 let result = room::Entity::delete_by_id(room_id)
822 .filter(room::Column::ChannelId.is_null())
823 .exec(&*tx)
824 .await?;
825 result.rows_affected > 0
826 } else {
827 false
828 };
829
830 let channel_members = if let Some(channel_id) = channel_id {
831 self.get_channel_members_internal(channel_id, &tx).await?
832 } else {
833 Vec::new()
834 };
835 let left_room = LeftRoom {
836 room,
837 channel_id,
838 channel_members,
839 left_projects,
840 canceled_calls_to_user_ids,
841 deleted,
842 };
843
844 if left_room.room.participants.is_empty() {
845 self.rooms.remove(&room_id);
846 }
847
848 Ok(Some((room_id, left_room)))
849 } else {
850 Ok(None)
851 }
852 })
853 .await
854 }
855
856 pub async fn update_room_participant_location(
857 &self,
858 room_id: RoomId,
859 connection: ConnectionId,
860 location: proto::ParticipantLocation,
861 ) -> Result<RoomGuard<proto::Room>> {
862 self.room_transaction(room_id, |tx| async {
863 let tx = tx;
864 let location_kind;
865 let location_project_id;
866 match location
867 .variant
868 .as_ref()
869 .ok_or_else(|| anyhow!("invalid location"))?
870 {
871 proto::participant_location::Variant::SharedProject(project) => {
872 location_kind = 0;
873 location_project_id = Some(ProjectId::from_proto(project.id));
874 }
875 proto::participant_location::Variant::UnsharedProject(_) => {
876 location_kind = 1;
877 location_project_id = None;
878 }
879 proto::participant_location::Variant::External(_) => {
880 location_kind = 2;
881 location_project_id = None;
882 }
883 }
884
885 let result = room_participant::Entity::update_many()
886 .filter(
887 Condition::all()
888 .add(room_participant::Column::RoomId.eq(room_id))
889 .add(
890 room_participant::Column::AnsweringConnectionId
891 .eq(connection.id as i32),
892 )
893 .add(
894 room_participant::Column::AnsweringConnectionServerId
895 .eq(connection.owner_id as i32),
896 ),
897 )
898 .set(room_participant::ActiveModel {
899 location_kind: ActiveValue::set(Some(location_kind)),
900 location_project_id: ActiveValue::set(location_project_id),
901 ..Default::default()
902 })
903 .exec(&*tx)
904 .await?;
905
906 if result.rows_affected == 1 {
907 let room = self.get_room(room_id, &tx).await?;
908 Ok(room)
909 } else {
910 Err(anyhow!("could not update room participant location"))?
911 }
912 })
913 .await
914 }
915
916 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
917 self.transaction(|tx| async move {
918 self.room_connection_lost(connection, &*tx).await?;
919 self.channel_buffer_connection_lost(connection, &*tx)
920 .await?;
921 self.channel_chat_connection_lost(connection, &*tx).await?;
922 Ok(())
923 })
924 .await
925 }
926
927 pub async fn room_connection_lost(
928 &self,
929 connection: ConnectionId,
930 tx: &DatabaseTransaction,
931 ) -> Result<()> {
932 let participant = room_participant::Entity::find()
933 .filter(
934 Condition::all()
935 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
936 .add(
937 room_participant::Column::AnsweringConnectionServerId
938 .eq(connection.owner_id as i32),
939 ),
940 )
941 .one(&*tx)
942 .await?;
943
944 if let Some(participant) = participant {
945 room_participant::Entity::update(room_participant::ActiveModel {
946 answering_connection_lost: ActiveValue::set(true),
947 ..participant.into_active_model()
948 })
949 .exec(&*tx)
950 .await?;
951 }
952 Ok(())
953 }
954
955 fn build_incoming_call(
956 room: &proto::Room,
957 called_user_id: UserId,
958 ) -> Option<proto::IncomingCall> {
959 let pending_participant = room
960 .pending_participants
961 .iter()
962 .find(|participant| participant.user_id == called_user_id.to_proto())?;
963
964 Some(proto::IncomingCall {
965 room_id: room.id,
966 calling_user_id: pending_participant.calling_user_id,
967 participant_user_ids: room
968 .participants
969 .iter()
970 .map(|participant| participant.user_id)
971 .collect(),
972 initial_project: room.participants.iter().find_map(|participant| {
973 let initial_project_id = pending_participant.initial_project_id?;
974 participant
975 .projects
976 .iter()
977 .find(|project| project.id == initial_project_id)
978 .cloned()
979 }),
980 })
981 }
982
983 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
984 let (_, room) = self.get_channel_room(room_id, tx).await?;
985 Ok(room)
986 }
987
988 pub async fn room_connection_ids(
989 &self,
990 room_id: RoomId,
991 connection_id: ConnectionId,
992 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
993 self.room_transaction(room_id, |tx| async move {
994 let mut participants = room_participant::Entity::find()
995 .filter(room_participant::Column::RoomId.eq(room_id))
996 .stream(&*tx)
997 .await?;
998
999 let mut is_participant = false;
1000 let mut connection_ids = HashSet::default();
1001 while let Some(participant) = participants.next().await {
1002 let participant = participant?;
1003 if let Some(answering_connection) = participant.answering_connection() {
1004 if answering_connection == connection_id {
1005 is_participant = true;
1006 } else {
1007 connection_ids.insert(answering_connection);
1008 }
1009 }
1010 }
1011
1012 if !is_participant {
1013 Err(anyhow!("not a room participant"))?;
1014 }
1015
1016 Ok(connection_ids)
1017 })
1018 .await
1019 }
1020
1021 async fn get_channel_room(
1022 &self,
1023 room_id: RoomId,
1024 tx: &DatabaseTransaction,
1025 ) -> Result<(Option<ChannelId>, proto::Room)> {
1026 let db_room = room::Entity::find_by_id(room_id)
1027 .one(tx)
1028 .await?
1029 .ok_or_else(|| anyhow!("could not find room"))?;
1030
1031 let mut db_participants = db_room
1032 .find_related(room_participant::Entity)
1033 .stream(tx)
1034 .await?;
1035 let mut participants = HashMap::default();
1036 let mut pending_participants = Vec::new();
1037 while let Some(db_participant) = db_participants.next().await {
1038 let db_participant = db_participant?;
1039 if let (
1040 Some(answering_connection_id),
1041 Some(answering_connection_server_id),
1042 Some(participant_index),
1043 ) = (
1044 db_participant.answering_connection_id,
1045 db_participant.answering_connection_server_id,
1046 db_participant.participant_index,
1047 ) {
1048 let location = match (
1049 db_participant.location_kind,
1050 db_participant.location_project_id,
1051 ) {
1052 (Some(0), Some(project_id)) => {
1053 Some(proto::participant_location::Variant::SharedProject(
1054 proto::participant_location::SharedProject {
1055 id: project_id.to_proto(),
1056 },
1057 ))
1058 }
1059 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1060 Default::default(),
1061 )),
1062 _ => Some(proto::participant_location::Variant::External(
1063 Default::default(),
1064 )),
1065 };
1066
1067 let answering_connection = ConnectionId {
1068 owner_id: answering_connection_server_id.0 as u32,
1069 id: answering_connection_id as u32,
1070 };
1071 participants.insert(
1072 answering_connection,
1073 proto::Participant {
1074 user_id: db_participant.user_id.to_proto(),
1075 peer_id: Some(answering_connection.into()),
1076 projects: Default::default(),
1077 location: Some(proto::ParticipantLocation { variant: location }),
1078 participant_index: participant_index as u32,
1079 },
1080 );
1081 } else {
1082 pending_participants.push(proto::PendingParticipant {
1083 user_id: db_participant.user_id.to_proto(),
1084 calling_user_id: db_participant.calling_user_id.to_proto(),
1085 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1086 });
1087 }
1088 }
1089 drop(db_participants);
1090
1091 let mut db_projects = db_room
1092 .find_related(project::Entity)
1093 .find_with_related(worktree::Entity)
1094 .stream(tx)
1095 .await?;
1096
1097 while let Some(row) = db_projects.next().await {
1098 let (db_project, db_worktree) = row?;
1099 let host_connection = db_project.host_connection()?;
1100 if let Some(participant) = participants.get_mut(&host_connection) {
1101 let project = if let Some(project) = participant
1102 .projects
1103 .iter_mut()
1104 .find(|project| project.id == db_project.id.to_proto())
1105 {
1106 project
1107 } else {
1108 participant.projects.push(proto::ParticipantProject {
1109 id: db_project.id.to_proto(),
1110 worktree_root_names: Default::default(),
1111 });
1112 participant.projects.last_mut().unwrap()
1113 };
1114
1115 if let Some(db_worktree) = db_worktree {
1116 if db_worktree.visible {
1117 project.worktree_root_names.push(db_worktree.root_name);
1118 }
1119 }
1120 }
1121 }
1122 drop(db_projects);
1123
1124 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1125 let mut followers = Vec::new();
1126 while let Some(db_follower) = db_followers.next().await {
1127 let db_follower = db_follower?;
1128 followers.push(proto::Follower {
1129 leader_id: Some(db_follower.leader_connection().into()),
1130 follower_id: Some(db_follower.follower_connection().into()),
1131 project_id: db_follower.project_id.to_proto(),
1132 });
1133 }
1134
1135 Ok((
1136 db_room.channel_id,
1137 proto::Room {
1138 id: db_room.id.to_proto(),
1139 live_kit_room: db_room.live_kit_room,
1140 participants: participants.into_values().collect(),
1141 pending_participants,
1142 followers,
1143 },
1144 ))
1145 }
1146}