1use super::*;
2
3impl Database {
4 pub async fn clear_stale_room_participants(
5 &self,
6 room_id: RoomId,
7 new_server_id: ServerId,
8 ) -> Result<RoomGuard<RefreshedRoom>> {
9 self.room_transaction(room_id, |tx| async move {
10 let stale_participant_filter = Condition::all()
11 .add(room_participant::Column::RoomId.eq(room_id))
12 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
13 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
14
15 let stale_participant_user_ids = room_participant::Entity::find()
16 .filter(stale_participant_filter.clone())
17 .all(&*tx)
18 .await?
19 .into_iter()
20 .map(|participant| participant.user_id)
21 .collect::<Vec<_>>();
22
23 // Delete participants who failed to reconnect and cancel their calls.
24 let mut canceled_calls_to_user_ids = Vec::new();
25 room_participant::Entity::delete_many()
26 .filter(stale_participant_filter)
27 .exec(&*tx)
28 .await?;
29 let called_participants = room_participant::Entity::find()
30 .filter(
31 Condition::all()
32 .add(
33 room_participant::Column::CallingUserId
34 .is_in(stale_participant_user_ids.iter().copied()),
35 )
36 .add(room_participant::Column::AnsweringConnectionId.is_null()),
37 )
38 .all(&*tx)
39 .await?;
40 room_participant::Entity::delete_many()
41 .filter(
42 room_participant::Column::Id
43 .is_in(called_participants.iter().map(|participant| participant.id)),
44 )
45 .exec(&*tx)
46 .await?;
47 canceled_calls_to_user_ids.extend(
48 called_participants
49 .into_iter()
50 .map(|participant| participant.user_id),
51 );
52
53 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
54 let channel_members;
55 if let Some(channel) = &channel {
56 channel_members = self.get_channel_participants(channel, &tx).await?;
57 } else {
58 channel_members = Vec::new();
59
60 // Delete the room if it becomes empty.
61 if room.participants.is_empty() {
62 project::Entity::delete_many()
63 .filter(project::Column::RoomId.eq(room_id))
64 .exec(&*tx)
65 .await?;
66 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
67 }
68 };
69
70 Ok(RefreshedRoom {
71 room,
72 channel_id: channel.map(|channel| channel.id),
73 channel_members,
74 stale_participant_user_ids,
75 canceled_calls_to_user_ids,
76 })
77 })
78 .await
79 }
80
81 pub async fn incoming_call_for_user(
82 &self,
83 user_id: UserId,
84 ) -> Result<Option<proto::IncomingCall>> {
85 self.transaction(|tx| async move {
86 let pending_participant = room_participant::Entity::find()
87 .filter(
88 room_participant::Column::UserId
89 .eq(user_id)
90 .and(room_participant::Column::AnsweringConnectionId.is_null()),
91 )
92 .one(&*tx)
93 .await?;
94
95 if let Some(pending_participant) = pending_participant {
96 let room = self.get_room(pending_participant.room_id, &tx).await?;
97 Ok(Self::build_incoming_call(&room, user_id))
98 } else {
99 Ok(None)
100 }
101 })
102 .await
103 }
104
105 pub async fn create_room(
106 &self,
107 user_id: UserId,
108 connection: ConnectionId,
109 live_kit_room: &str,
110 release_channel: &str,
111 ) -> Result<proto::Room> {
112 self.transaction(|tx| async move {
113 let room = room::ActiveModel {
114 live_kit_room: ActiveValue::set(live_kit_room.into()),
115 enviroment: ActiveValue::set(Some(release_channel.to_string())),
116 ..Default::default()
117 }
118 .insert(&*tx)
119 .await?;
120 room_participant::ActiveModel {
121 room_id: ActiveValue::set(room.id),
122 user_id: ActiveValue::set(user_id),
123 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
124 answering_connection_server_id: ActiveValue::set(Some(ServerId(
125 connection.owner_id as i32,
126 ))),
127 answering_connection_lost: ActiveValue::set(false),
128 calling_user_id: ActiveValue::set(user_id),
129 calling_connection_id: ActiveValue::set(connection.id as i32),
130 calling_connection_server_id: ActiveValue::set(Some(ServerId(
131 connection.owner_id as i32,
132 ))),
133 participant_index: ActiveValue::set(Some(0)),
134 ..Default::default()
135 }
136 .insert(&*tx)
137 .await?;
138
139 let room = self.get_room(room.id, &tx).await?;
140 Ok(room)
141 })
142 .await
143 }
144
145 pub async fn call(
146 &self,
147 room_id: RoomId,
148 calling_user_id: UserId,
149 calling_connection: ConnectionId,
150 called_user_id: UserId,
151 initial_project_id: Option<ProjectId>,
152 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
153 self.room_transaction(room_id, |tx| async move {
154 room_participant::ActiveModel {
155 room_id: ActiveValue::set(room_id),
156 user_id: ActiveValue::set(called_user_id),
157 answering_connection_lost: ActiveValue::set(false),
158 participant_index: ActiveValue::NotSet,
159 calling_user_id: ActiveValue::set(calling_user_id),
160 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
161 calling_connection_server_id: ActiveValue::set(Some(ServerId(
162 calling_connection.owner_id as i32,
163 ))),
164 initial_project_id: ActiveValue::set(initial_project_id),
165 ..Default::default()
166 }
167 .insert(&*tx)
168 .await?;
169
170 let room = self.get_room(room_id, &tx).await?;
171 let incoming_call = Self::build_incoming_call(&room, called_user_id)
172 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
173 Ok((room, incoming_call))
174 })
175 .await
176 }
177
178 pub async fn call_failed(
179 &self,
180 room_id: RoomId,
181 called_user_id: UserId,
182 ) -> Result<RoomGuard<proto::Room>> {
183 self.room_transaction(room_id, |tx| async move {
184 room_participant::Entity::delete_many()
185 .filter(
186 room_participant::Column::RoomId
187 .eq(room_id)
188 .and(room_participant::Column::UserId.eq(called_user_id)),
189 )
190 .exec(&*tx)
191 .await?;
192 let room = self.get_room(room_id, &tx).await?;
193 Ok(room)
194 })
195 .await
196 }
197
198 pub async fn decline_call(
199 &self,
200 expected_room_id: Option<RoomId>,
201 user_id: UserId,
202 ) -> Result<Option<RoomGuard<proto::Room>>> {
203 self.optional_room_transaction(|tx| async move {
204 let mut filter = Condition::all()
205 .add(room_participant::Column::UserId.eq(user_id))
206 .add(room_participant::Column::AnsweringConnectionId.is_null());
207 if let Some(room_id) = expected_room_id {
208 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
209 }
210 let participant = room_participant::Entity::find()
211 .filter(filter)
212 .one(&*tx)
213 .await?;
214
215 let participant = if let Some(participant) = participant {
216 participant
217 } else if expected_room_id.is_some() {
218 return Err(anyhow!("could not find call to decline"))?;
219 } else {
220 return Ok(None);
221 };
222
223 let room_id = participant.room_id;
224 room_participant::Entity::delete(participant.into_active_model())
225 .exec(&*tx)
226 .await?;
227
228 let room = self.get_room(room_id, &tx).await?;
229 Ok(Some((room_id, room)))
230 })
231 .await
232 }
233
234 pub async fn cancel_call(
235 &self,
236 room_id: RoomId,
237 calling_connection: ConnectionId,
238 called_user_id: UserId,
239 ) -> Result<RoomGuard<proto::Room>> {
240 self.room_transaction(room_id, |tx| async move {
241 let participant = room_participant::Entity::find()
242 .filter(
243 Condition::all()
244 .add(room_participant::Column::UserId.eq(called_user_id))
245 .add(room_participant::Column::RoomId.eq(room_id))
246 .add(
247 room_participant::Column::CallingConnectionId
248 .eq(calling_connection.id as i32),
249 )
250 .add(
251 room_participant::Column::CallingConnectionServerId
252 .eq(calling_connection.owner_id as i32),
253 )
254 .add(room_participant::Column::AnsweringConnectionId.is_null()),
255 )
256 .one(&*tx)
257 .await?
258 .ok_or_else(|| anyhow!("no call to cancel"))?;
259
260 room_participant::Entity::delete(participant.into_active_model())
261 .exec(&*tx)
262 .await?;
263
264 let room = self.get_room(room_id, &tx).await?;
265 Ok(room)
266 })
267 .await
268 }
269
270 pub async fn join_room(
271 &self,
272 room_id: RoomId,
273 user_id: UserId,
274 connection: ConnectionId,
275 enviroment: &str,
276 ) -> Result<RoomGuard<JoinRoom>> {
277 self.room_transaction(room_id, |tx| async move {
278 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
279 enum QueryChannelIdAndEnviroment {
280 ChannelId,
281 Enviroment,
282 }
283
284 let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
285 room::Entity::find()
286 .select_only()
287 .column(room::Column::ChannelId)
288 .column(room::Column::Enviroment)
289 .filter(room::Column::Id.eq(room_id))
290 .into_values::<_, QueryChannelIdAndEnviroment>()
291 .one(&*tx)
292 .await?
293 .ok_or_else(|| anyhow!("no such room"))?;
294
295 if let Some(release_channel) = release_channel {
296 if &release_channel != enviroment {
297 Err(anyhow!("must join using the {} release", release_channel))?;
298 }
299 }
300
301 if channel_id.is_some() {
302 Err(anyhow!("tried to join channel call directly"))?
303 }
304
305 let participant_index = self
306 .get_next_participant_index_internal(room_id, &*tx)
307 .await?;
308
309 let result = room_participant::Entity::update_many()
310 .filter(
311 Condition::all()
312 .add(room_participant::Column::RoomId.eq(room_id))
313 .add(room_participant::Column::UserId.eq(user_id))
314 .add(room_participant::Column::AnsweringConnectionId.is_null()),
315 )
316 .set(room_participant::ActiveModel {
317 participant_index: ActiveValue::Set(Some(participant_index)),
318 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
319 answering_connection_server_id: ActiveValue::set(Some(ServerId(
320 connection.owner_id as i32,
321 ))),
322 answering_connection_lost: ActiveValue::set(false),
323 ..Default::default()
324 })
325 .exec(&*tx)
326 .await?;
327 if result.rows_affected == 0 {
328 Err(anyhow!("room does not exist or was already joined"))?;
329 }
330
331 let room = self.get_room(room_id, &tx).await?;
332 Ok(JoinRoom {
333 room,
334 channel_id: None,
335 channel_members: vec![],
336 })
337 })
338 .await
339 }
340
341 async fn get_next_participant_index_internal(
342 &self,
343 room_id: RoomId,
344 tx: &DatabaseTransaction,
345 ) -> Result<i32> {
346 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
347 enum QueryParticipantIndices {
348 ParticipantIndex,
349 }
350 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
351 .filter(
352 room_participant::Column::RoomId
353 .eq(room_id)
354 .and(room_participant::Column::ParticipantIndex.is_not_null()),
355 )
356 .select_only()
357 .column(room_participant::Column::ParticipantIndex)
358 .into_values::<_, QueryParticipantIndices>()
359 .all(&*tx)
360 .await?;
361
362 let mut participant_index = 0;
363 while existing_participant_indices.contains(&participant_index) {
364 participant_index += 1;
365 }
366
367 Ok(participant_index)
368 }
369
370 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
371 self.transaction(|tx| async move {
372 let room: Option<room::Model> = room::Entity::find()
373 .filter(room::Column::Id.eq(room_id))
374 .one(&*tx)
375 .await?;
376
377 Ok(room.and_then(|room| room.channel_id))
378 })
379 .await
380 }
381
382 pub(crate) async fn join_channel_room_internal(
383 &self,
384 room_id: RoomId,
385 user_id: UserId,
386 connection: ConnectionId,
387 tx: &DatabaseTransaction,
388 ) -> Result<JoinRoom> {
389 let participant_index = self
390 .get_next_participant_index_internal(room_id, &*tx)
391 .await?;
392
393 room_participant::Entity::insert_many([room_participant::ActiveModel {
394 room_id: ActiveValue::set(room_id),
395 user_id: ActiveValue::set(user_id),
396 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
397 answering_connection_server_id: ActiveValue::set(Some(ServerId(
398 connection.owner_id as i32,
399 ))),
400 answering_connection_lost: ActiveValue::set(false),
401 calling_user_id: ActiveValue::set(user_id),
402 calling_connection_id: ActiveValue::set(connection.id as i32),
403 calling_connection_server_id: ActiveValue::set(Some(ServerId(
404 connection.owner_id as i32,
405 ))),
406 participant_index: ActiveValue::Set(Some(participant_index)),
407 ..Default::default()
408 }])
409 .on_conflict(
410 OnConflict::columns([room_participant::Column::UserId])
411 .update_columns([
412 room_participant::Column::AnsweringConnectionId,
413 room_participant::Column::AnsweringConnectionServerId,
414 room_participant::Column::AnsweringConnectionLost,
415 room_participant::Column::ParticipantIndex,
416 ])
417 .to_owned(),
418 )
419 .exec(&*tx)
420 .await?;
421
422 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
423 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
424 let channel_members = self.get_channel_participants(&channel, &*tx).await?;
425 Ok(JoinRoom {
426 room,
427 channel_id: Some(channel.id),
428 channel_members,
429 })
430 }
431
432 pub async fn rejoin_room(
433 &self,
434 rejoin_room: proto::RejoinRoom,
435 user_id: UserId,
436 connection: ConnectionId,
437 ) -> Result<RoomGuard<RejoinedRoom>> {
438 let room_id = RoomId::from_proto(rejoin_room.id);
439 self.room_transaction(room_id, |tx| async {
440 let tx = tx;
441 let participant_update = room_participant::Entity::update_many()
442 .filter(
443 Condition::all()
444 .add(room_participant::Column::RoomId.eq(room_id))
445 .add(room_participant::Column::UserId.eq(user_id))
446 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
447 .add(
448 Condition::any()
449 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
450 .add(
451 room_participant::Column::AnsweringConnectionServerId
452 .ne(connection.owner_id as i32),
453 ),
454 ),
455 )
456 .set(room_participant::ActiveModel {
457 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
458 answering_connection_server_id: ActiveValue::set(Some(ServerId(
459 connection.owner_id as i32,
460 ))),
461 answering_connection_lost: ActiveValue::set(false),
462 ..Default::default()
463 })
464 .exec(&*tx)
465 .await?;
466 if participant_update.rows_affected == 0 {
467 return Err(anyhow!("room does not exist or was already joined"))?;
468 }
469
470 let mut reshared_projects = Vec::new();
471 for reshared_project in &rejoin_room.reshared_projects {
472 let project_id = ProjectId::from_proto(reshared_project.project_id);
473 let project = project::Entity::find_by_id(project_id)
474 .one(&*tx)
475 .await?
476 .ok_or_else(|| anyhow!("project does not exist"))?;
477 if project.host_user_id != user_id {
478 return Err(anyhow!("no such project"))?;
479 }
480
481 let mut collaborators = project
482 .find_related(project_collaborator::Entity)
483 .all(&*tx)
484 .await?;
485 let host_ix = collaborators
486 .iter()
487 .position(|collaborator| {
488 collaborator.user_id == user_id && collaborator.is_host
489 })
490 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
491 let host = collaborators.swap_remove(host_ix);
492 let old_connection_id = host.connection();
493
494 project::Entity::update(project::ActiveModel {
495 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
496 host_connection_server_id: ActiveValue::set(Some(ServerId(
497 connection.owner_id as i32,
498 ))),
499 ..project.into_active_model()
500 })
501 .exec(&*tx)
502 .await?;
503 project_collaborator::Entity::update(project_collaborator::ActiveModel {
504 connection_id: ActiveValue::set(connection.id as i32),
505 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
506 ..host.into_active_model()
507 })
508 .exec(&*tx)
509 .await?;
510
511 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
512 .await?;
513
514 reshared_projects.push(ResharedProject {
515 id: project_id,
516 old_connection_id,
517 collaborators: collaborators
518 .iter()
519 .map(|collaborator| ProjectCollaborator {
520 connection_id: collaborator.connection(),
521 user_id: collaborator.user_id,
522 replica_id: collaborator.replica_id,
523 is_host: collaborator.is_host,
524 })
525 .collect(),
526 worktrees: reshared_project.worktrees.clone(),
527 });
528 }
529
530 project::Entity::delete_many()
531 .filter(
532 Condition::all()
533 .add(project::Column::RoomId.eq(room_id))
534 .add(project::Column::HostUserId.eq(user_id))
535 .add(
536 project::Column::Id
537 .is_not_in(reshared_projects.iter().map(|project| project.id)),
538 ),
539 )
540 .exec(&*tx)
541 .await?;
542
543 let mut rejoined_projects = Vec::new();
544 for rejoined_project in &rejoin_room.rejoined_projects {
545 let project_id = ProjectId::from_proto(rejoined_project.id);
546 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
547 continue;
548 };
549
550 let mut worktrees = Vec::new();
551 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
552 for db_worktree in db_worktrees {
553 let mut worktree = RejoinedWorktree {
554 id: db_worktree.id as u64,
555 abs_path: db_worktree.abs_path,
556 root_name: db_worktree.root_name,
557 visible: db_worktree.visible,
558 updated_entries: Default::default(),
559 removed_entries: Default::default(),
560 updated_repositories: Default::default(),
561 removed_repositories: Default::default(),
562 diagnostic_summaries: Default::default(),
563 settings_files: Default::default(),
564 scan_id: db_worktree.scan_id as u64,
565 completed_scan_id: db_worktree.completed_scan_id as u64,
566 };
567
568 let rejoined_worktree = rejoined_project
569 .worktrees
570 .iter()
571 .find(|worktree| worktree.id == db_worktree.id as u64);
572
573 // File entries
574 {
575 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
576 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
577 } else {
578 worktree_entry::Column::IsDeleted.eq(false)
579 };
580
581 let mut db_entries = worktree_entry::Entity::find()
582 .filter(
583 Condition::all()
584 .add(worktree_entry::Column::ProjectId.eq(project.id))
585 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
586 .add(entry_filter),
587 )
588 .stream(&*tx)
589 .await?;
590
591 while let Some(db_entry) = db_entries.next().await {
592 let db_entry = db_entry?;
593 if db_entry.is_deleted {
594 worktree.removed_entries.push(db_entry.id as u64);
595 } else {
596 worktree.updated_entries.push(proto::Entry {
597 id: db_entry.id as u64,
598 is_dir: db_entry.is_dir,
599 path: db_entry.path,
600 inode: db_entry.inode as u64,
601 mtime: Some(proto::Timestamp {
602 seconds: db_entry.mtime_seconds as u64,
603 nanos: db_entry.mtime_nanos as u32,
604 }),
605 is_symlink: db_entry.is_symlink,
606 is_ignored: db_entry.is_ignored,
607 is_external: db_entry.is_external,
608 git_status: db_entry.git_status.map(|status| status as i32),
609 });
610 }
611 }
612 }
613
614 // Repository Entries
615 {
616 let repository_entry_filter =
617 if let Some(rejoined_worktree) = rejoined_worktree {
618 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
619 } else {
620 worktree_repository::Column::IsDeleted.eq(false)
621 };
622
623 let mut db_repositories = worktree_repository::Entity::find()
624 .filter(
625 Condition::all()
626 .add(worktree_repository::Column::ProjectId.eq(project.id))
627 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
628 .add(repository_entry_filter),
629 )
630 .stream(&*tx)
631 .await?;
632
633 while let Some(db_repository) = db_repositories.next().await {
634 let db_repository = db_repository?;
635 if db_repository.is_deleted {
636 worktree
637 .removed_repositories
638 .push(db_repository.work_directory_id as u64);
639 } else {
640 worktree.updated_repositories.push(proto::RepositoryEntry {
641 work_directory_id: db_repository.work_directory_id as u64,
642 branch: db_repository.branch,
643 });
644 }
645 }
646 }
647
648 worktrees.push(worktree);
649 }
650
651 let language_servers = project
652 .find_related(language_server::Entity)
653 .all(&*tx)
654 .await?
655 .into_iter()
656 .map(|language_server| proto::LanguageServer {
657 id: language_server.id as u64,
658 name: language_server.name,
659 })
660 .collect::<Vec<_>>();
661
662 {
663 let mut db_settings_files = worktree_settings_file::Entity::find()
664 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
665 .stream(&*tx)
666 .await?;
667 while let Some(db_settings_file) = db_settings_files.next().await {
668 let db_settings_file = db_settings_file?;
669 if let Some(worktree) = worktrees
670 .iter_mut()
671 .find(|w| w.id == db_settings_file.worktree_id as u64)
672 {
673 worktree.settings_files.push(WorktreeSettingsFile {
674 path: db_settings_file.path,
675 content: db_settings_file.content,
676 });
677 }
678 }
679 }
680
681 let mut collaborators = project
682 .find_related(project_collaborator::Entity)
683 .all(&*tx)
684 .await?;
685 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
686 .iter()
687 .position(|collaborator| collaborator.user_id == user_id)
688 {
689 collaborators.swap_remove(self_collaborator_ix)
690 } else {
691 continue;
692 };
693 let old_connection_id = self_collaborator.connection();
694 project_collaborator::Entity::update(project_collaborator::ActiveModel {
695 connection_id: ActiveValue::set(connection.id as i32),
696 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
697 ..self_collaborator.into_active_model()
698 })
699 .exec(&*tx)
700 .await?;
701
702 let collaborators = collaborators
703 .into_iter()
704 .map(|collaborator| ProjectCollaborator {
705 connection_id: collaborator.connection(),
706 user_id: collaborator.user_id,
707 replica_id: collaborator.replica_id,
708 is_host: collaborator.is_host,
709 })
710 .collect::<Vec<_>>();
711
712 rejoined_projects.push(RejoinedProject {
713 id: project_id,
714 old_connection_id,
715 collaborators,
716 worktrees,
717 language_servers,
718 });
719 }
720
721 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
722 let channel_members = if let Some(channel) = &channel {
723 self.get_channel_participants(&channel, &tx).await?
724 } else {
725 Vec::new()
726 };
727
728 Ok(RejoinedRoom {
729 room,
730 channel_id: channel.map(|channel| channel.id),
731 channel_members,
732 rejoined_projects,
733 reshared_projects,
734 })
735 })
736 .await
737 }
738
739 pub async fn leave_room(
740 &self,
741 connection: ConnectionId,
742 ) -> Result<Option<RoomGuard<LeftRoom>>> {
743 self.optional_room_transaction(|tx| async move {
744 let leaving_participant = room_participant::Entity::find()
745 .filter(
746 Condition::all()
747 .add(
748 room_participant::Column::AnsweringConnectionId
749 .eq(connection.id as i32),
750 )
751 .add(
752 room_participant::Column::AnsweringConnectionServerId
753 .eq(connection.owner_id as i32),
754 ),
755 )
756 .one(&*tx)
757 .await?;
758
759 if let Some(leaving_participant) = leaving_participant {
760 // Leave room.
761 let room_id = leaving_participant.room_id;
762 room_participant::Entity::delete_by_id(leaving_participant.id)
763 .exec(&*tx)
764 .await?;
765
766 // Cancel pending calls initiated by the leaving user.
767 let called_participants = room_participant::Entity::find()
768 .filter(
769 Condition::all()
770 .add(
771 room_participant::Column::CallingUserId
772 .eq(leaving_participant.user_id),
773 )
774 .add(room_participant::Column::AnsweringConnectionId.is_null()),
775 )
776 .all(&*tx)
777 .await?;
778 room_participant::Entity::delete_many()
779 .filter(
780 room_participant::Column::Id
781 .is_in(called_participants.iter().map(|participant| participant.id)),
782 )
783 .exec(&*tx)
784 .await?;
785 let canceled_calls_to_user_ids = called_participants
786 .into_iter()
787 .map(|participant| participant.user_id)
788 .collect();
789
790 // Detect left projects.
791 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
792 enum QueryProjectIds {
793 ProjectId,
794 }
795 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
796 .select_only()
797 .column_as(
798 project_collaborator::Column::ProjectId,
799 QueryProjectIds::ProjectId,
800 )
801 .filter(
802 Condition::all()
803 .add(
804 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
805 )
806 .add(
807 project_collaborator::Column::ConnectionServerId
808 .eq(connection.owner_id as i32),
809 ),
810 )
811 .into_values::<_, QueryProjectIds>()
812 .all(&*tx)
813 .await?;
814 let mut left_projects = HashMap::default();
815 let mut collaborators = project_collaborator::Entity::find()
816 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
817 .stream(&*tx)
818 .await?;
819 while let Some(collaborator) = collaborators.next().await {
820 let collaborator = collaborator?;
821 let left_project =
822 left_projects
823 .entry(collaborator.project_id)
824 .or_insert(LeftProject {
825 id: collaborator.project_id,
826 host_user_id: Default::default(),
827 connection_ids: Default::default(),
828 host_connection_id: Default::default(),
829 });
830
831 let collaborator_connection_id = collaborator.connection();
832 if collaborator_connection_id != connection {
833 left_project.connection_ids.push(collaborator_connection_id);
834 }
835
836 if collaborator.is_host {
837 left_project.host_user_id = collaborator.user_id;
838 left_project.host_connection_id = collaborator_connection_id;
839 }
840 }
841 drop(collaborators);
842
843 // Leave projects.
844 project_collaborator::Entity::delete_many()
845 .filter(
846 Condition::all()
847 .add(
848 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
849 )
850 .add(
851 project_collaborator::Column::ConnectionServerId
852 .eq(connection.owner_id as i32),
853 ),
854 )
855 .exec(&*tx)
856 .await?;
857
858 // Unshare projects.
859 project::Entity::delete_many()
860 .filter(
861 Condition::all()
862 .add(project::Column::RoomId.eq(room_id))
863 .add(project::Column::HostConnectionId.eq(connection.id as i32))
864 .add(
865 project::Column::HostConnectionServerId
866 .eq(connection.owner_id as i32),
867 ),
868 )
869 .exec(&*tx)
870 .await?;
871
872 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
873 let deleted = if room.participants.is_empty() {
874 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
875 result.rows_affected > 0
876 } else {
877 false
878 };
879
880 let channel_members = if let Some(channel) = &channel {
881 self.get_channel_participants(channel, &tx).await?
882 } else {
883 Vec::new()
884 };
885 let left_room = LeftRoom {
886 room,
887 channel_id: channel.map(|channel| channel.id),
888 channel_members,
889 left_projects,
890 canceled_calls_to_user_ids,
891 deleted,
892 };
893
894 if left_room.room.participants.is_empty() {
895 self.rooms.remove(&room_id);
896 }
897
898 Ok(Some((room_id, left_room)))
899 } else {
900 Ok(None)
901 }
902 })
903 .await
904 }
905
906 pub async fn update_room_participant_location(
907 &self,
908 room_id: RoomId,
909 connection: ConnectionId,
910 location: proto::ParticipantLocation,
911 ) -> Result<RoomGuard<proto::Room>> {
912 self.room_transaction(room_id, |tx| async {
913 let tx = tx;
914 let location_kind;
915 let location_project_id;
916 match location
917 .variant
918 .as_ref()
919 .ok_or_else(|| anyhow!("invalid location"))?
920 {
921 proto::participant_location::Variant::SharedProject(project) => {
922 location_kind = 0;
923 location_project_id = Some(ProjectId::from_proto(project.id));
924 }
925 proto::participant_location::Variant::UnsharedProject(_) => {
926 location_kind = 1;
927 location_project_id = None;
928 }
929 proto::participant_location::Variant::External(_) => {
930 location_kind = 2;
931 location_project_id = None;
932 }
933 }
934
935 let result = room_participant::Entity::update_many()
936 .filter(
937 Condition::all()
938 .add(room_participant::Column::RoomId.eq(room_id))
939 .add(
940 room_participant::Column::AnsweringConnectionId
941 .eq(connection.id as i32),
942 )
943 .add(
944 room_participant::Column::AnsweringConnectionServerId
945 .eq(connection.owner_id as i32),
946 ),
947 )
948 .set(room_participant::ActiveModel {
949 location_kind: ActiveValue::set(Some(location_kind)),
950 location_project_id: ActiveValue::set(location_project_id),
951 ..Default::default()
952 })
953 .exec(&*tx)
954 .await?;
955
956 if result.rows_affected == 1 {
957 let room = self.get_room(room_id, &tx).await?;
958 Ok(room)
959 } else {
960 Err(anyhow!("could not update room participant location"))?
961 }
962 })
963 .await
964 }
965
966 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
967 self.transaction(|tx| async move {
968 self.room_connection_lost(connection, &*tx).await?;
969 self.channel_buffer_connection_lost(connection, &*tx)
970 .await?;
971 self.channel_chat_connection_lost(connection, &*tx).await?;
972 Ok(())
973 })
974 .await
975 }
976
977 pub async fn room_connection_lost(
978 &self,
979 connection: ConnectionId,
980 tx: &DatabaseTransaction,
981 ) -> Result<()> {
982 let participant = room_participant::Entity::find()
983 .filter(
984 Condition::all()
985 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
986 .add(
987 room_participant::Column::AnsweringConnectionServerId
988 .eq(connection.owner_id as i32),
989 ),
990 )
991 .one(&*tx)
992 .await?;
993
994 if let Some(participant) = participant {
995 room_participant::Entity::update(room_participant::ActiveModel {
996 answering_connection_lost: ActiveValue::set(true),
997 ..participant.into_active_model()
998 })
999 .exec(&*tx)
1000 .await?;
1001 }
1002 Ok(())
1003 }
1004
1005 fn build_incoming_call(
1006 room: &proto::Room,
1007 called_user_id: UserId,
1008 ) -> Option<proto::IncomingCall> {
1009 let pending_participant = room
1010 .pending_participants
1011 .iter()
1012 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1013
1014 Some(proto::IncomingCall {
1015 room_id: room.id,
1016 calling_user_id: pending_participant.calling_user_id,
1017 participant_user_ids: room
1018 .participants
1019 .iter()
1020 .map(|participant| participant.user_id)
1021 .collect(),
1022 initial_project: room.participants.iter().find_map(|participant| {
1023 let initial_project_id = pending_participant.initial_project_id?;
1024 participant
1025 .projects
1026 .iter()
1027 .find(|project| project.id == initial_project_id)
1028 .cloned()
1029 }),
1030 })
1031 }
1032
1033 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1034 let (_, room) = self.get_channel_room(room_id, tx).await?;
1035 Ok(room)
1036 }
1037
1038 pub async fn room_connection_ids(
1039 &self,
1040 room_id: RoomId,
1041 connection_id: ConnectionId,
1042 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1043 self.room_transaction(room_id, |tx| async move {
1044 let mut participants = room_participant::Entity::find()
1045 .filter(room_participant::Column::RoomId.eq(room_id))
1046 .stream(&*tx)
1047 .await?;
1048
1049 let mut is_participant = false;
1050 let mut connection_ids = HashSet::default();
1051 while let Some(participant) = participants.next().await {
1052 let participant = participant?;
1053 if let Some(answering_connection) = participant.answering_connection() {
1054 if answering_connection == connection_id {
1055 is_participant = true;
1056 } else {
1057 connection_ids.insert(answering_connection);
1058 }
1059 }
1060 }
1061
1062 if !is_participant {
1063 Err(anyhow!("not a room participant"))?;
1064 }
1065
1066 Ok(connection_ids)
1067 })
1068 .await
1069 }
1070
1071 async fn get_channel_room(
1072 &self,
1073 room_id: RoomId,
1074 tx: &DatabaseTransaction,
1075 ) -> Result<(Option<channel::Model>, proto::Room)> {
1076 let db_room = room::Entity::find_by_id(room_id)
1077 .one(tx)
1078 .await?
1079 .ok_or_else(|| anyhow!("could not find room"))?;
1080
1081 let mut db_participants = db_room
1082 .find_related(room_participant::Entity)
1083 .stream(tx)
1084 .await?;
1085 let mut participants = HashMap::default();
1086 let mut pending_participants = Vec::new();
1087 while let Some(db_participant) = db_participants.next().await {
1088 let db_participant = db_participant?;
1089 if let (
1090 Some(answering_connection_id),
1091 Some(answering_connection_server_id),
1092 Some(participant_index),
1093 ) = (
1094 db_participant.answering_connection_id,
1095 db_participant.answering_connection_server_id,
1096 db_participant.participant_index,
1097 ) {
1098 let location = match (
1099 db_participant.location_kind,
1100 db_participant.location_project_id,
1101 ) {
1102 (Some(0), Some(project_id)) => {
1103 Some(proto::participant_location::Variant::SharedProject(
1104 proto::participant_location::SharedProject {
1105 id: project_id.to_proto(),
1106 },
1107 ))
1108 }
1109 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1110 Default::default(),
1111 )),
1112 _ => Some(proto::participant_location::Variant::External(
1113 Default::default(),
1114 )),
1115 };
1116
1117 let answering_connection = ConnectionId {
1118 owner_id: answering_connection_server_id.0 as u32,
1119 id: answering_connection_id as u32,
1120 };
1121 participants.insert(
1122 answering_connection,
1123 proto::Participant {
1124 user_id: db_participant.user_id.to_proto(),
1125 peer_id: Some(answering_connection.into()),
1126 projects: Default::default(),
1127 location: Some(proto::ParticipantLocation { variant: location }),
1128 participant_index: participant_index as u32,
1129 },
1130 );
1131 } else {
1132 pending_participants.push(proto::PendingParticipant {
1133 user_id: db_participant.user_id.to_proto(),
1134 calling_user_id: db_participant.calling_user_id.to_proto(),
1135 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1136 });
1137 }
1138 }
1139 drop(db_participants);
1140
1141 let mut db_projects = db_room
1142 .find_related(project::Entity)
1143 .find_with_related(worktree::Entity)
1144 .stream(tx)
1145 .await?;
1146
1147 while let Some(row) = db_projects.next().await {
1148 let (db_project, db_worktree) = row?;
1149 let host_connection = db_project.host_connection()?;
1150 if let Some(participant) = participants.get_mut(&host_connection) {
1151 let project = if let Some(project) = participant
1152 .projects
1153 .iter_mut()
1154 .find(|project| project.id == db_project.id.to_proto())
1155 {
1156 project
1157 } else {
1158 participant.projects.push(proto::ParticipantProject {
1159 id: db_project.id.to_proto(),
1160 worktree_root_names: Default::default(),
1161 });
1162 participant.projects.last_mut().unwrap()
1163 };
1164
1165 if let Some(db_worktree) = db_worktree {
1166 if db_worktree.visible {
1167 project.worktree_root_names.push(db_worktree.root_name);
1168 }
1169 }
1170 }
1171 }
1172 drop(db_projects);
1173
1174 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1175 let mut followers = Vec::new();
1176 while let Some(db_follower) = db_followers.next().await {
1177 let db_follower = db_follower?;
1178 followers.push(proto::Follower {
1179 leader_id: Some(db_follower.leader_connection().into()),
1180 follower_id: Some(db_follower.follower_connection().into()),
1181 project_id: db_follower.project_id.to_proto(),
1182 });
1183 }
1184 drop(db_followers);
1185
1186 let channel = if let Some(channel_id) = db_room.channel_id {
1187 Some(self.get_channel_internal(channel_id, &*tx).await?)
1188 } else {
1189 None
1190 };
1191
1192 Ok((
1193 channel,
1194 proto::Room {
1195 id: db_room.id.to_proto(),
1196 live_kit_room: db_room.live_kit_room,
1197 participants: participants.into_values().collect(),
1198 pending_participants,
1199 followers,
1200 },
1201 ))
1202 }
1203}