1use super::*;
2
3impl Database {
4 pub async fn clear_stale_room_participants(
5 &self,
6 room_id: RoomId,
7 new_server_id: ServerId,
8 ) -> Result<RoomGuard<RefreshedRoom>> {
9 self.room_transaction(room_id, |tx| async move {
10 let stale_participant_filter = Condition::all()
11 .add(room_participant::Column::RoomId.eq(room_id))
12 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
13 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
14
15 let stale_participant_user_ids = room_participant::Entity::find()
16 .filter(stale_participant_filter.clone())
17 .all(&*tx)
18 .await?
19 .into_iter()
20 .map(|participant| participant.user_id)
21 .collect::<Vec<_>>();
22
23 // Delete participants who failed to reconnect and cancel their calls.
24 let mut canceled_calls_to_user_ids = Vec::new();
25 room_participant::Entity::delete_many()
26 .filter(stale_participant_filter)
27 .exec(&*tx)
28 .await?;
29 let called_participants = room_participant::Entity::find()
30 .filter(
31 Condition::all()
32 .add(
33 room_participant::Column::CallingUserId
34 .is_in(stale_participant_user_ids.iter().copied()),
35 )
36 .add(room_participant::Column::AnsweringConnectionId.is_null()),
37 )
38 .all(&*tx)
39 .await?;
40 room_participant::Entity::delete_many()
41 .filter(
42 room_participant::Column::Id
43 .is_in(called_participants.iter().map(|participant| participant.id)),
44 )
45 .exec(&*tx)
46 .await?;
47 canceled_calls_to_user_ids.extend(
48 called_participants
49 .into_iter()
50 .map(|participant| participant.user_id),
51 );
52
53 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
54 let channel_members;
55 if let Some(channel_id) = channel_id {
56 channel_members = self
57 .get_channel_participants_internal(channel_id, &tx)
58 .await?;
59 } else {
60 channel_members = Vec::new();
61
62 // Delete the room if it becomes empty.
63 if room.participants.is_empty() {
64 project::Entity::delete_many()
65 .filter(project::Column::RoomId.eq(room_id))
66 .exec(&*tx)
67 .await?;
68 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
69 }
70 };
71
72 Ok(RefreshedRoom {
73 room,
74 channel_id,
75 channel_members,
76 stale_participant_user_ids,
77 canceled_calls_to_user_ids,
78 })
79 })
80 .await
81 }
82
83 pub async fn incoming_call_for_user(
84 &self,
85 user_id: UserId,
86 ) -> Result<Option<proto::IncomingCall>> {
87 self.transaction(|tx| async move {
88 let pending_participant = room_participant::Entity::find()
89 .filter(
90 room_participant::Column::UserId
91 .eq(user_id)
92 .and(room_participant::Column::AnsweringConnectionId.is_null()),
93 )
94 .one(&*tx)
95 .await?;
96
97 if let Some(pending_participant) = pending_participant {
98 let room = self.get_room(pending_participant.room_id, &tx).await?;
99 Ok(Self::build_incoming_call(&room, user_id))
100 } else {
101 Ok(None)
102 }
103 })
104 .await
105 }
106
107 pub async fn create_room(
108 &self,
109 user_id: UserId,
110 connection: ConnectionId,
111 live_kit_room: &str,
112 release_channel: &str,
113 ) -> Result<proto::Room> {
114 self.transaction(|tx| async move {
115 let room = room::ActiveModel {
116 live_kit_room: ActiveValue::set(live_kit_room.into()),
117 enviroment: ActiveValue::set(Some(release_channel.to_string())),
118 ..Default::default()
119 }
120 .insert(&*tx)
121 .await?;
122 room_participant::ActiveModel {
123 room_id: ActiveValue::set(room.id),
124 user_id: ActiveValue::set(user_id),
125 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
126 answering_connection_server_id: ActiveValue::set(Some(ServerId(
127 connection.owner_id as i32,
128 ))),
129 answering_connection_lost: ActiveValue::set(false),
130 calling_user_id: ActiveValue::set(user_id),
131 calling_connection_id: ActiveValue::set(connection.id as i32),
132 calling_connection_server_id: ActiveValue::set(Some(ServerId(
133 connection.owner_id as i32,
134 ))),
135 participant_index: ActiveValue::set(Some(0)),
136 ..Default::default()
137 }
138 .insert(&*tx)
139 .await?;
140
141 let room = self.get_room(room.id, &tx).await?;
142 Ok(room)
143 })
144 .await
145 }
146
147 pub async fn call(
148 &self,
149 room_id: RoomId,
150 calling_user_id: UserId,
151 calling_connection: ConnectionId,
152 called_user_id: UserId,
153 initial_project_id: Option<ProjectId>,
154 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
155 self.room_transaction(room_id, |tx| async move {
156 room_participant::ActiveModel {
157 room_id: ActiveValue::set(room_id),
158 user_id: ActiveValue::set(called_user_id),
159 answering_connection_lost: ActiveValue::set(false),
160 participant_index: ActiveValue::NotSet,
161 calling_user_id: ActiveValue::set(calling_user_id),
162 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
163 calling_connection_server_id: ActiveValue::set(Some(ServerId(
164 calling_connection.owner_id as i32,
165 ))),
166 initial_project_id: ActiveValue::set(initial_project_id),
167 ..Default::default()
168 }
169 .insert(&*tx)
170 .await?;
171
172 let room = self.get_room(room_id, &tx).await?;
173 let incoming_call = Self::build_incoming_call(&room, called_user_id)
174 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
175 Ok((room, incoming_call))
176 })
177 .await
178 }
179
180 pub async fn call_failed(
181 &self,
182 room_id: RoomId,
183 called_user_id: UserId,
184 ) -> Result<RoomGuard<proto::Room>> {
185 self.room_transaction(room_id, |tx| async move {
186 room_participant::Entity::delete_many()
187 .filter(
188 room_participant::Column::RoomId
189 .eq(room_id)
190 .and(room_participant::Column::UserId.eq(called_user_id)),
191 )
192 .exec(&*tx)
193 .await?;
194 let room = self.get_room(room_id, &tx).await?;
195 Ok(room)
196 })
197 .await
198 }
199
200 pub async fn decline_call(
201 &self,
202 expected_room_id: Option<RoomId>,
203 user_id: UserId,
204 ) -> Result<Option<RoomGuard<proto::Room>>> {
205 self.optional_room_transaction(|tx| async move {
206 let mut filter = Condition::all()
207 .add(room_participant::Column::UserId.eq(user_id))
208 .add(room_participant::Column::AnsweringConnectionId.is_null());
209 if let Some(room_id) = expected_room_id {
210 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
211 }
212 let participant = room_participant::Entity::find()
213 .filter(filter)
214 .one(&*tx)
215 .await?;
216
217 let participant = if let Some(participant) = participant {
218 participant
219 } else if expected_room_id.is_some() {
220 return Err(anyhow!("could not find call to decline"))?;
221 } else {
222 return Ok(None);
223 };
224
225 let room_id = participant.room_id;
226 room_participant::Entity::delete(participant.into_active_model())
227 .exec(&*tx)
228 .await?;
229
230 let room = self.get_room(room_id, &tx).await?;
231 Ok(Some((room_id, room)))
232 })
233 .await
234 }
235
236 pub async fn cancel_call(
237 &self,
238 room_id: RoomId,
239 calling_connection: ConnectionId,
240 called_user_id: UserId,
241 ) -> Result<RoomGuard<proto::Room>> {
242 self.room_transaction(room_id, |tx| async move {
243 let participant = room_participant::Entity::find()
244 .filter(
245 Condition::all()
246 .add(room_participant::Column::UserId.eq(called_user_id))
247 .add(room_participant::Column::RoomId.eq(room_id))
248 .add(
249 room_participant::Column::CallingConnectionId
250 .eq(calling_connection.id as i32),
251 )
252 .add(
253 room_participant::Column::CallingConnectionServerId
254 .eq(calling_connection.owner_id as i32),
255 )
256 .add(room_participant::Column::AnsweringConnectionId.is_null()),
257 )
258 .one(&*tx)
259 .await?
260 .ok_or_else(|| anyhow!("no call to cancel"))?;
261
262 room_participant::Entity::delete(participant.into_active_model())
263 .exec(&*tx)
264 .await?;
265
266 let room = self.get_room(room_id, &tx).await?;
267 Ok(room)
268 })
269 .await
270 }
271
272 pub async fn join_room(
273 &self,
274 room_id: RoomId,
275 user_id: UserId,
276 connection: ConnectionId,
277 enviroment: &str,
278 ) -> Result<RoomGuard<JoinRoom>> {
279 self.room_transaction(room_id, |tx| async move {
280 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
281 enum QueryChannelIdAndEnviroment {
282 ChannelId,
283 Enviroment,
284 }
285
286 let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
287 room::Entity::find()
288 .select_only()
289 .column(room::Column::ChannelId)
290 .column(room::Column::Enviroment)
291 .filter(room::Column::Id.eq(room_id))
292 .into_values::<_, QueryChannelIdAndEnviroment>()
293 .one(&*tx)
294 .await?
295 .ok_or_else(|| anyhow!("no such room"))?;
296
297 if let Some(release_channel) = release_channel {
298 if &release_channel != enviroment {
299 Err(anyhow!("must join using the {} release", release_channel))?;
300 }
301 }
302
303 if channel_id.is_some() {
304 Err(anyhow!("tried to join channel call directly"))?
305 }
306
307 let participant_index = self
308 .get_next_participant_index_internal(room_id, &*tx)
309 .await?;
310
311 let result = room_participant::Entity::update_many()
312 .filter(
313 Condition::all()
314 .add(room_participant::Column::RoomId.eq(room_id))
315 .add(room_participant::Column::UserId.eq(user_id))
316 .add(room_participant::Column::AnsweringConnectionId.is_null()),
317 )
318 .set(room_participant::ActiveModel {
319 participant_index: ActiveValue::Set(Some(participant_index)),
320 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
321 answering_connection_server_id: ActiveValue::set(Some(ServerId(
322 connection.owner_id as i32,
323 ))),
324 answering_connection_lost: ActiveValue::set(false),
325 ..Default::default()
326 })
327 .exec(&*tx)
328 .await?;
329 if result.rows_affected == 0 {
330 Err(anyhow!("room does not exist or was already joined"))?;
331 }
332
333 let room = self.get_room(room_id, &tx).await?;
334 Ok(JoinRoom {
335 room,
336 channel_id: None,
337 channel_members: vec![],
338 })
339 })
340 .await
341 }
342
343 async fn get_next_participant_index_internal(
344 &self,
345 room_id: RoomId,
346 tx: &DatabaseTransaction,
347 ) -> Result<i32> {
348 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
349 enum QueryParticipantIndices {
350 ParticipantIndex,
351 }
352 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
353 .filter(
354 room_participant::Column::RoomId
355 .eq(room_id)
356 .and(room_participant::Column::ParticipantIndex.is_not_null()),
357 )
358 .select_only()
359 .column(room_participant::Column::ParticipantIndex)
360 .into_values::<_, QueryParticipantIndices>()
361 .all(&*tx)
362 .await?;
363
364 let mut participant_index = 0;
365 while existing_participant_indices.contains(&participant_index) {
366 participant_index += 1;
367 }
368
369 Ok(participant_index)
370 }
371
372 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
373 self.transaction(|tx| async move {
374 let room: Option<room::Model> = room::Entity::find()
375 .filter(room::Column::Id.eq(room_id))
376 .one(&*tx)
377 .await?;
378
379 Ok(room.and_then(|room| room.channel_id))
380 })
381 .await
382 }
383
384 pub(crate) async fn join_channel_room_internal(
385 &self,
386 channel_id: ChannelId,
387 room_id: RoomId,
388 user_id: UserId,
389 connection: ConnectionId,
390 tx: &DatabaseTransaction,
391 ) -> Result<JoinRoom> {
392 let participant_index = self
393 .get_next_participant_index_internal(room_id, &*tx)
394 .await?;
395
396 room_participant::Entity::insert_many([room_participant::ActiveModel {
397 room_id: ActiveValue::set(room_id),
398 user_id: ActiveValue::set(user_id),
399 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
400 answering_connection_server_id: ActiveValue::set(Some(ServerId(
401 connection.owner_id as i32,
402 ))),
403 answering_connection_lost: ActiveValue::set(false),
404 calling_user_id: ActiveValue::set(user_id),
405 calling_connection_id: ActiveValue::set(connection.id as i32),
406 calling_connection_server_id: ActiveValue::set(Some(ServerId(
407 connection.owner_id as i32,
408 ))),
409 participant_index: ActiveValue::Set(Some(participant_index)),
410 ..Default::default()
411 }])
412 .on_conflict(
413 OnConflict::columns([room_participant::Column::UserId])
414 .update_columns([
415 room_participant::Column::AnsweringConnectionId,
416 room_participant::Column::AnsweringConnectionServerId,
417 room_participant::Column::AnsweringConnectionLost,
418 room_participant::Column::ParticipantIndex,
419 ])
420 .to_owned(),
421 )
422 .exec(&*tx)
423 .await?;
424
425 let room = self.get_room(room_id, &tx).await?;
426 let channel_members = self
427 .get_channel_participants_internal(channel_id, &tx)
428 .await?;
429 Ok(JoinRoom {
430 room,
431 channel_id: Some(channel_id),
432 channel_members,
433 })
434 }
435
436 pub async fn rejoin_room(
437 &self,
438 rejoin_room: proto::RejoinRoom,
439 user_id: UserId,
440 connection: ConnectionId,
441 ) -> Result<RoomGuard<RejoinedRoom>> {
442 let room_id = RoomId::from_proto(rejoin_room.id);
443 self.room_transaction(room_id, |tx| async {
444 let tx = tx;
445 let participant_update = room_participant::Entity::update_many()
446 .filter(
447 Condition::all()
448 .add(room_participant::Column::RoomId.eq(room_id))
449 .add(room_participant::Column::UserId.eq(user_id))
450 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
451 .add(
452 Condition::any()
453 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
454 .add(
455 room_participant::Column::AnsweringConnectionServerId
456 .ne(connection.owner_id as i32),
457 ),
458 ),
459 )
460 .set(room_participant::ActiveModel {
461 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
462 answering_connection_server_id: ActiveValue::set(Some(ServerId(
463 connection.owner_id as i32,
464 ))),
465 answering_connection_lost: ActiveValue::set(false),
466 ..Default::default()
467 })
468 .exec(&*tx)
469 .await?;
470 if participant_update.rows_affected == 0 {
471 return Err(anyhow!("room does not exist or was already joined"))?;
472 }
473
474 let mut reshared_projects = Vec::new();
475 for reshared_project in &rejoin_room.reshared_projects {
476 let project_id = ProjectId::from_proto(reshared_project.project_id);
477 let project = project::Entity::find_by_id(project_id)
478 .one(&*tx)
479 .await?
480 .ok_or_else(|| anyhow!("project does not exist"))?;
481 if project.host_user_id != user_id {
482 return Err(anyhow!("no such project"))?;
483 }
484
485 let mut collaborators = project
486 .find_related(project_collaborator::Entity)
487 .all(&*tx)
488 .await?;
489 let host_ix = collaborators
490 .iter()
491 .position(|collaborator| {
492 collaborator.user_id == user_id && collaborator.is_host
493 })
494 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
495 let host = collaborators.swap_remove(host_ix);
496 let old_connection_id = host.connection();
497
498 project::Entity::update(project::ActiveModel {
499 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
500 host_connection_server_id: ActiveValue::set(Some(ServerId(
501 connection.owner_id as i32,
502 ))),
503 ..project.into_active_model()
504 })
505 .exec(&*tx)
506 .await?;
507 project_collaborator::Entity::update(project_collaborator::ActiveModel {
508 connection_id: ActiveValue::set(connection.id as i32),
509 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
510 ..host.into_active_model()
511 })
512 .exec(&*tx)
513 .await?;
514
515 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
516 .await?;
517
518 reshared_projects.push(ResharedProject {
519 id: project_id,
520 old_connection_id,
521 collaborators: collaborators
522 .iter()
523 .map(|collaborator| ProjectCollaborator {
524 connection_id: collaborator.connection(),
525 user_id: collaborator.user_id,
526 replica_id: collaborator.replica_id,
527 is_host: collaborator.is_host,
528 })
529 .collect(),
530 worktrees: reshared_project.worktrees.clone(),
531 });
532 }
533
534 project::Entity::delete_many()
535 .filter(
536 Condition::all()
537 .add(project::Column::RoomId.eq(room_id))
538 .add(project::Column::HostUserId.eq(user_id))
539 .add(
540 project::Column::Id
541 .is_not_in(reshared_projects.iter().map(|project| project.id)),
542 ),
543 )
544 .exec(&*tx)
545 .await?;
546
547 let mut rejoined_projects = Vec::new();
548 for rejoined_project in &rejoin_room.rejoined_projects {
549 let project_id = ProjectId::from_proto(rejoined_project.id);
550 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
551 continue;
552 };
553
554 let mut worktrees = Vec::new();
555 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
556 for db_worktree in db_worktrees {
557 let mut worktree = RejoinedWorktree {
558 id: db_worktree.id as u64,
559 abs_path: db_worktree.abs_path,
560 root_name: db_worktree.root_name,
561 visible: db_worktree.visible,
562 updated_entries: Default::default(),
563 removed_entries: Default::default(),
564 updated_repositories: Default::default(),
565 removed_repositories: Default::default(),
566 diagnostic_summaries: Default::default(),
567 settings_files: Default::default(),
568 scan_id: db_worktree.scan_id as u64,
569 completed_scan_id: db_worktree.completed_scan_id as u64,
570 };
571
572 let rejoined_worktree = rejoined_project
573 .worktrees
574 .iter()
575 .find(|worktree| worktree.id == db_worktree.id as u64);
576
577 // File entries
578 {
579 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
580 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
581 } else {
582 worktree_entry::Column::IsDeleted.eq(false)
583 };
584
585 let mut db_entries = worktree_entry::Entity::find()
586 .filter(
587 Condition::all()
588 .add(worktree_entry::Column::ProjectId.eq(project.id))
589 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
590 .add(entry_filter),
591 )
592 .stream(&*tx)
593 .await?;
594
595 while let Some(db_entry) = db_entries.next().await {
596 let db_entry = db_entry?;
597 if db_entry.is_deleted {
598 worktree.removed_entries.push(db_entry.id as u64);
599 } else {
600 worktree.updated_entries.push(proto::Entry {
601 id: db_entry.id as u64,
602 is_dir: db_entry.is_dir,
603 path: db_entry.path,
604 inode: db_entry.inode as u64,
605 mtime: Some(proto::Timestamp {
606 seconds: db_entry.mtime_seconds as u64,
607 nanos: db_entry.mtime_nanos as u32,
608 }),
609 is_symlink: db_entry.is_symlink,
610 is_ignored: db_entry.is_ignored,
611 is_external: db_entry.is_external,
612 git_status: db_entry.git_status.map(|status| status as i32),
613 });
614 }
615 }
616 }
617
618 // Repository Entries
619 {
620 let repository_entry_filter =
621 if let Some(rejoined_worktree) = rejoined_worktree {
622 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
623 } else {
624 worktree_repository::Column::IsDeleted.eq(false)
625 };
626
627 let mut db_repositories = worktree_repository::Entity::find()
628 .filter(
629 Condition::all()
630 .add(worktree_repository::Column::ProjectId.eq(project.id))
631 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
632 .add(repository_entry_filter),
633 )
634 .stream(&*tx)
635 .await?;
636
637 while let Some(db_repository) = db_repositories.next().await {
638 let db_repository = db_repository?;
639 if db_repository.is_deleted {
640 worktree
641 .removed_repositories
642 .push(db_repository.work_directory_id as u64);
643 } else {
644 worktree.updated_repositories.push(proto::RepositoryEntry {
645 work_directory_id: db_repository.work_directory_id as u64,
646 branch: db_repository.branch,
647 });
648 }
649 }
650 }
651
652 worktrees.push(worktree);
653 }
654
655 let language_servers = project
656 .find_related(language_server::Entity)
657 .all(&*tx)
658 .await?
659 .into_iter()
660 .map(|language_server| proto::LanguageServer {
661 id: language_server.id as u64,
662 name: language_server.name,
663 })
664 .collect::<Vec<_>>();
665
666 {
667 let mut db_settings_files = worktree_settings_file::Entity::find()
668 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
669 .stream(&*tx)
670 .await?;
671 while let Some(db_settings_file) = db_settings_files.next().await {
672 let db_settings_file = db_settings_file?;
673 if let Some(worktree) = worktrees
674 .iter_mut()
675 .find(|w| w.id == db_settings_file.worktree_id as u64)
676 {
677 worktree.settings_files.push(WorktreeSettingsFile {
678 path: db_settings_file.path,
679 content: db_settings_file.content,
680 });
681 }
682 }
683 }
684
685 let mut collaborators = project
686 .find_related(project_collaborator::Entity)
687 .all(&*tx)
688 .await?;
689 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
690 .iter()
691 .position(|collaborator| collaborator.user_id == user_id)
692 {
693 collaborators.swap_remove(self_collaborator_ix)
694 } else {
695 continue;
696 };
697 let old_connection_id = self_collaborator.connection();
698 project_collaborator::Entity::update(project_collaborator::ActiveModel {
699 connection_id: ActiveValue::set(connection.id as i32),
700 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
701 ..self_collaborator.into_active_model()
702 })
703 .exec(&*tx)
704 .await?;
705
706 let collaborators = collaborators
707 .into_iter()
708 .map(|collaborator| ProjectCollaborator {
709 connection_id: collaborator.connection(),
710 user_id: collaborator.user_id,
711 replica_id: collaborator.replica_id,
712 is_host: collaborator.is_host,
713 })
714 .collect::<Vec<_>>();
715
716 rejoined_projects.push(RejoinedProject {
717 id: project_id,
718 old_connection_id,
719 collaborators,
720 worktrees,
721 language_servers,
722 });
723 }
724
725 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
726 let channel_members = if let Some(channel_id) = channel_id {
727 self.get_channel_participants_internal(channel_id, &tx)
728 .await?
729 } else {
730 Vec::new()
731 };
732
733 Ok(RejoinedRoom {
734 room,
735 channel_id,
736 channel_members,
737 rejoined_projects,
738 reshared_projects,
739 })
740 })
741 .await
742 }
743
744 pub async fn leave_room(
745 &self,
746 connection: ConnectionId,
747 ) -> Result<Option<RoomGuard<LeftRoom>>> {
748 self.optional_room_transaction(|tx| async move {
749 let leaving_participant = room_participant::Entity::find()
750 .filter(
751 Condition::all()
752 .add(
753 room_participant::Column::AnsweringConnectionId
754 .eq(connection.id as i32),
755 )
756 .add(
757 room_participant::Column::AnsweringConnectionServerId
758 .eq(connection.owner_id as i32),
759 ),
760 )
761 .one(&*tx)
762 .await?;
763
764 if let Some(leaving_participant) = leaving_participant {
765 // Leave room.
766 let room_id = leaving_participant.room_id;
767 room_participant::Entity::delete_by_id(leaving_participant.id)
768 .exec(&*tx)
769 .await?;
770
771 // Cancel pending calls initiated by the leaving user.
772 let called_participants = room_participant::Entity::find()
773 .filter(
774 Condition::all()
775 .add(
776 room_participant::Column::CallingUserId
777 .eq(leaving_participant.user_id),
778 )
779 .add(room_participant::Column::AnsweringConnectionId.is_null()),
780 )
781 .all(&*tx)
782 .await?;
783 room_participant::Entity::delete_many()
784 .filter(
785 room_participant::Column::Id
786 .is_in(called_participants.iter().map(|participant| participant.id)),
787 )
788 .exec(&*tx)
789 .await?;
790 let canceled_calls_to_user_ids = called_participants
791 .into_iter()
792 .map(|participant| participant.user_id)
793 .collect();
794
795 // Detect left projects.
796 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
797 enum QueryProjectIds {
798 ProjectId,
799 }
800 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
801 .select_only()
802 .column_as(
803 project_collaborator::Column::ProjectId,
804 QueryProjectIds::ProjectId,
805 )
806 .filter(
807 Condition::all()
808 .add(
809 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
810 )
811 .add(
812 project_collaborator::Column::ConnectionServerId
813 .eq(connection.owner_id as i32),
814 ),
815 )
816 .into_values::<_, QueryProjectIds>()
817 .all(&*tx)
818 .await?;
819 let mut left_projects = HashMap::default();
820 let mut collaborators = project_collaborator::Entity::find()
821 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
822 .stream(&*tx)
823 .await?;
824 while let Some(collaborator) = collaborators.next().await {
825 let collaborator = collaborator?;
826 let left_project =
827 left_projects
828 .entry(collaborator.project_id)
829 .or_insert(LeftProject {
830 id: collaborator.project_id,
831 host_user_id: Default::default(),
832 connection_ids: Default::default(),
833 host_connection_id: Default::default(),
834 });
835
836 let collaborator_connection_id = collaborator.connection();
837 if collaborator_connection_id != connection {
838 left_project.connection_ids.push(collaborator_connection_id);
839 }
840
841 if collaborator.is_host {
842 left_project.host_user_id = collaborator.user_id;
843 left_project.host_connection_id = collaborator_connection_id;
844 }
845 }
846 drop(collaborators);
847
848 // Leave projects.
849 project_collaborator::Entity::delete_many()
850 .filter(
851 Condition::all()
852 .add(
853 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
854 )
855 .add(
856 project_collaborator::Column::ConnectionServerId
857 .eq(connection.owner_id as i32),
858 ),
859 )
860 .exec(&*tx)
861 .await?;
862
863 // Unshare projects.
864 project::Entity::delete_many()
865 .filter(
866 Condition::all()
867 .add(project::Column::RoomId.eq(room_id))
868 .add(project::Column::HostConnectionId.eq(connection.id as i32))
869 .add(
870 project::Column::HostConnectionServerId
871 .eq(connection.owner_id as i32),
872 ),
873 )
874 .exec(&*tx)
875 .await?;
876
877 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
878 let deleted = if room.participants.is_empty() {
879 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
880 result.rows_affected > 0
881 } else {
882 false
883 };
884
885 let channel_members = if let Some(channel_id) = channel_id {
886 self.get_channel_participants_internal(channel_id, &tx)
887 .await?
888 } else {
889 Vec::new()
890 };
891 let left_room = LeftRoom {
892 room,
893 channel_id,
894 channel_members,
895 left_projects,
896 canceled_calls_to_user_ids,
897 deleted,
898 };
899
900 if left_room.room.participants.is_empty() {
901 self.rooms.remove(&room_id);
902 }
903
904 Ok(Some((room_id, left_room)))
905 } else {
906 Ok(None)
907 }
908 })
909 .await
910 }
911
912 pub async fn update_room_participant_location(
913 &self,
914 room_id: RoomId,
915 connection: ConnectionId,
916 location: proto::ParticipantLocation,
917 ) -> Result<RoomGuard<proto::Room>> {
918 self.room_transaction(room_id, |tx| async {
919 let tx = tx;
920 let location_kind;
921 let location_project_id;
922 match location
923 .variant
924 .as_ref()
925 .ok_or_else(|| anyhow!("invalid location"))?
926 {
927 proto::participant_location::Variant::SharedProject(project) => {
928 location_kind = 0;
929 location_project_id = Some(ProjectId::from_proto(project.id));
930 }
931 proto::participant_location::Variant::UnsharedProject(_) => {
932 location_kind = 1;
933 location_project_id = None;
934 }
935 proto::participant_location::Variant::External(_) => {
936 location_kind = 2;
937 location_project_id = None;
938 }
939 }
940
941 let result = room_participant::Entity::update_many()
942 .filter(
943 Condition::all()
944 .add(room_participant::Column::RoomId.eq(room_id))
945 .add(
946 room_participant::Column::AnsweringConnectionId
947 .eq(connection.id as i32),
948 )
949 .add(
950 room_participant::Column::AnsweringConnectionServerId
951 .eq(connection.owner_id as i32),
952 ),
953 )
954 .set(room_participant::ActiveModel {
955 location_kind: ActiveValue::set(Some(location_kind)),
956 location_project_id: ActiveValue::set(location_project_id),
957 ..Default::default()
958 })
959 .exec(&*tx)
960 .await?;
961
962 if result.rows_affected == 1 {
963 let room = self.get_room(room_id, &tx).await?;
964 Ok(room)
965 } else {
966 Err(anyhow!("could not update room participant location"))?
967 }
968 })
969 .await
970 }
971
972 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
973 self.transaction(|tx| async move {
974 self.room_connection_lost(connection, &*tx).await?;
975 self.channel_buffer_connection_lost(connection, &*tx)
976 .await?;
977 self.channel_chat_connection_lost(connection, &*tx).await?;
978 Ok(())
979 })
980 .await
981 }
982
983 pub async fn room_connection_lost(
984 &self,
985 connection: ConnectionId,
986 tx: &DatabaseTransaction,
987 ) -> Result<()> {
988 let participant = room_participant::Entity::find()
989 .filter(
990 Condition::all()
991 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
992 .add(
993 room_participant::Column::AnsweringConnectionServerId
994 .eq(connection.owner_id as i32),
995 ),
996 )
997 .one(&*tx)
998 .await?;
999
1000 if let Some(participant) = participant {
1001 room_participant::Entity::update(room_participant::ActiveModel {
1002 answering_connection_lost: ActiveValue::set(true),
1003 ..participant.into_active_model()
1004 })
1005 .exec(&*tx)
1006 .await?;
1007 }
1008 Ok(())
1009 }
1010
1011 fn build_incoming_call(
1012 room: &proto::Room,
1013 called_user_id: UserId,
1014 ) -> Option<proto::IncomingCall> {
1015 let pending_participant = room
1016 .pending_participants
1017 .iter()
1018 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1019
1020 Some(proto::IncomingCall {
1021 room_id: room.id,
1022 calling_user_id: pending_participant.calling_user_id,
1023 participant_user_ids: room
1024 .participants
1025 .iter()
1026 .map(|participant| participant.user_id)
1027 .collect(),
1028 initial_project: room.participants.iter().find_map(|participant| {
1029 let initial_project_id = pending_participant.initial_project_id?;
1030 participant
1031 .projects
1032 .iter()
1033 .find(|project| project.id == initial_project_id)
1034 .cloned()
1035 }),
1036 })
1037 }
1038
1039 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1040 let (_, room) = self.get_channel_room(room_id, tx).await?;
1041 Ok(room)
1042 }
1043
1044 pub async fn room_connection_ids(
1045 &self,
1046 room_id: RoomId,
1047 connection_id: ConnectionId,
1048 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1049 self.room_transaction(room_id, |tx| async move {
1050 let mut participants = room_participant::Entity::find()
1051 .filter(room_participant::Column::RoomId.eq(room_id))
1052 .stream(&*tx)
1053 .await?;
1054
1055 let mut is_participant = false;
1056 let mut connection_ids = HashSet::default();
1057 while let Some(participant) = participants.next().await {
1058 let participant = participant?;
1059 if let Some(answering_connection) = participant.answering_connection() {
1060 if answering_connection == connection_id {
1061 is_participant = true;
1062 } else {
1063 connection_ids.insert(answering_connection);
1064 }
1065 }
1066 }
1067
1068 if !is_participant {
1069 Err(anyhow!("not a room participant"))?;
1070 }
1071
1072 Ok(connection_ids)
1073 })
1074 .await
1075 }
1076
1077 async fn get_channel_room(
1078 &self,
1079 room_id: RoomId,
1080 tx: &DatabaseTransaction,
1081 ) -> Result<(Option<ChannelId>, proto::Room)> {
1082 let db_room = room::Entity::find_by_id(room_id)
1083 .one(tx)
1084 .await?
1085 .ok_or_else(|| anyhow!("could not find room"))?;
1086
1087 let mut db_participants = db_room
1088 .find_related(room_participant::Entity)
1089 .stream(tx)
1090 .await?;
1091 let mut participants = HashMap::default();
1092 let mut pending_participants = Vec::new();
1093 while let Some(db_participant) = db_participants.next().await {
1094 let db_participant = db_participant?;
1095 if let (
1096 Some(answering_connection_id),
1097 Some(answering_connection_server_id),
1098 Some(participant_index),
1099 ) = (
1100 db_participant.answering_connection_id,
1101 db_participant.answering_connection_server_id,
1102 db_participant.participant_index,
1103 ) {
1104 let location = match (
1105 db_participant.location_kind,
1106 db_participant.location_project_id,
1107 ) {
1108 (Some(0), Some(project_id)) => {
1109 Some(proto::participant_location::Variant::SharedProject(
1110 proto::participant_location::SharedProject {
1111 id: project_id.to_proto(),
1112 },
1113 ))
1114 }
1115 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1116 Default::default(),
1117 )),
1118 _ => Some(proto::participant_location::Variant::External(
1119 Default::default(),
1120 )),
1121 };
1122
1123 let answering_connection = ConnectionId {
1124 owner_id: answering_connection_server_id.0 as u32,
1125 id: answering_connection_id as u32,
1126 };
1127 participants.insert(
1128 answering_connection,
1129 proto::Participant {
1130 user_id: db_participant.user_id.to_proto(),
1131 peer_id: Some(answering_connection.into()),
1132 projects: Default::default(),
1133 location: Some(proto::ParticipantLocation { variant: location }),
1134 participant_index: participant_index as u32,
1135 },
1136 );
1137 } else {
1138 pending_participants.push(proto::PendingParticipant {
1139 user_id: db_participant.user_id.to_proto(),
1140 calling_user_id: db_participant.calling_user_id.to_proto(),
1141 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1142 });
1143 }
1144 }
1145 drop(db_participants);
1146
1147 let mut db_projects = db_room
1148 .find_related(project::Entity)
1149 .find_with_related(worktree::Entity)
1150 .stream(tx)
1151 .await?;
1152
1153 while let Some(row) = db_projects.next().await {
1154 let (db_project, db_worktree) = row?;
1155 let host_connection = db_project.host_connection()?;
1156 if let Some(participant) = participants.get_mut(&host_connection) {
1157 let project = if let Some(project) = participant
1158 .projects
1159 .iter_mut()
1160 .find(|project| project.id == db_project.id.to_proto())
1161 {
1162 project
1163 } else {
1164 participant.projects.push(proto::ParticipantProject {
1165 id: db_project.id.to_proto(),
1166 worktree_root_names: Default::default(),
1167 });
1168 participant.projects.last_mut().unwrap()
1169 };
1170
1171 if let Some(db_worktree) = db_worktree {
1172 if db_worktree.visible {
1173 project.worktree_root_names.push(db_worktree.root_name);
1174 }
1175 }
1176 }
1177 }
1178 drop(db_projects);
1179
1180 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1181 let mut followers = Vec::new();
1182 while let Some(db_follower) = db_followers.next().await {
1183 let db_follower = db_follower?;
1184 followers.push(proto::Follower {
1185 leader_id: Some(db_follower.leader_connection().into()),
1186 follower_id: Some(db_follower.follower_connection().into()),
1187 project_id: db_follower.project_id.to_proto(),
1188 });
1189 }
1190
1191 Ok((
1192 db_room.channel_id,
1193 proto::Room {
1194 id: db_room.id.to_proto(),
1195 live_kit_room: db_room.live_kit_room,
1196 participants: participants.into_values().collect(),
1197 pending_participants,
1198 followers,
1199 },
1200 ))
1201 }
1202}