1use super::*;
2
3impl Database {
4 pub async fn clear_stale_room_participants(
5 &self,
6 room_id: RoomId,
7 new_server_id: ServerId,
8 ) -> Result<RoomGuard<RefreshedRoom>> {
9 self.room_transaction(room_id, |tx| async move {
10 let stale_participant_filter = Condition::all()
11 .add(room_participant::Column::RoomId.eq(room_id))
12 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
13 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
14
15 let stale_participant_user_ids = room_participant::Entity::find()
16 .filter(stale_participant_filter.clone())
17 .all(&*tx)
18 .await?
19 .into_iter()
20 .map(|participant| participant.user_id)
21 .collect::<Vec<_>>();
22
23 // Delete participants who failed to reconnect and cancel their calls.
24 let mut canceled_calls_to_user_ids = Vec::new();
25 room_participant::Entity::delete_many()
26 .filter(stale_participant_filter)
27 .exec(&*tx)
28 .await?;
29 let called_participants = room_participant::Entity::find()
30 .filter(
31 Condition::all()
32 .add(
33 room_participant::Column::CallingUserId
34 .is_in(stale_participant_user_ids.iter().copied()),
35 )
36 .add(room_participant::Column::AnsweringConnectionId.is_null()),
37 )
38 .all(&*tx)
39 .await?;
40 room_participant::Entity::delete_many()
41 .filter(
42 room_participant::Column::Id
43 .is_in(called_participants.iter().map(|participant| participant.id)),
44 )
45 .exec(&*tx)
46 .await?;
47 canceled_calls_to_user_ids.extend(
48 called_participants
49 .into_iter()
50 .map(|participant| participant.user_id),
51 );
52
53 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
54 let channel_members;
55 if let Some(channel_id) = channel_id {
56 channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
57 } else {
58 channel_members = Vec::new();
59
60 // Delete the room if it becomes empty.
61 if room.participants.is_empty() {
62 project::Entity::delete_many()
63 .filter(project::Column::RoomId.eq(room_id))
64 .exec(&*tx)
65 .await?;
66 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
67 }
68 };
69
70 Ok(RefreshedRoom {
71 room,
72 channel_id,
73 channel_members,
74 stale_participant_user_ids,
75 canceled_calls_to_user_ids,
76 })
77 })
78 .await
79 }
80
81 pub async fn incoming_call_for_user(
82 &self,
83 user_id: UserId,
84 ) -> Result<Option<proto::IncomingCall>> {
85 self.transaction(|tx| async move {
86 let pending_participant = room_participant::Entity::find()
87 .filter(
88 room_participant::Column::UserId
89 .eq(user_id)
90 .and(room_participant::Column::AnsweringConnectionId.is_null()),
91 )
92 .one(&*tx)
93 .await?;
94
95 if let Some(pending_participant) = pending_participant {
96 let room = self.get_room(pending_participant.room_id, &tx).await?;
97 Ok(Self::build_incoming_call(&room, user_id))
98 } else {
99 Ok(None)
100 }
101 })
102 .await
103 }
104
105 pub async fn create_room(
106 &self,
107 user_id: UserId,
108 connection: ConnectionId,
109 live_kit_room: &str,
110 release_channel: &str,
111 ) -> Result<proto::Room> {
112 self.transaction(|tx| async move {
113 let room = room::ActiveModel {
114 live_kit_room: ActiveValue::set(live_kit_room.into()),
115 enviroment: ActiveValue::set(Some(release_channel.to_string())),
116 ..Default::default()
117 }
118 .insert(&*tx)
119 .await?;
120 room_participant::ActiveModel {
121 room_id: ActiveValue::set(room.id),
122 user_id: ActiveValue::set(user_id),
123 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
124 answering_connection_server_id: ActiveValue::set(Some(ServerId(
125 connection.owner_id as i32,
126 ))),
127 answering_connection_lost: ActiveValue::set(false),
128 calling_user_id: ActiveValue::set(user_id),
129 calling_connection_id: ActiveValue::set(connection.id as i32),
130 calling_connection_server_id: ActiveValue::set(Some(ServerId(
131 connection.owner_id as i32,
132 ))),
133 participant_index: ActiveValue::set(Some(0)),
134 ..Default::default()
135 }
136 .insert(&*tx)
137 .await?;
138
139 let room = self.get_room(room.id, &tx).await?;
140 Ok(room)
141 })
142 .await
143 }
144
145 pub async fn call(
146 &self,
147 room_id: RoomId,
148 calling_user_id: UserId,
149 calling_connection: ConnectionId,
150 called_user_id: UserId,
151 initial_project_id: Option<ProjectId>,
152 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
153 self.room_transaction(room_id, |tx| async move {
154 room_participant::ActiveModel {
155 room_id: ActiveValue::set(room_id),
156 user_id: ActiveValue::set(called_user_id),
157 answering_connection_lost: ActiveValue::set(false),
158 participant_index: ActiveValue::NotSet,
159 calling_user_id: ActiveValue::set(calling_user_id),
160 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
161 calling_connection_server_id: ActiveValue::set(Some(ServerId(
162 calling_connection.owner_id as i32,
163 ))),
164 initial_project_id: ActiveValue::set(initial_project_id),
165 ..Default::default()
166 }
167 .insert(&*tx)
168 .await?;
169
170 let room = self.get_room(room_id, &tx).await?;
171 let incoming_call = Self::build_incoming_call(&room, called_user_id)
172 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
173 Ok((room, incoming_call))
174 })
175 .await
176 }
177
178 pub async fn call_failed(
179 &self,
180 room_id: RoomId,
181 called_user_id: UserId,
182 ) -> Result<RoomGuard<proto::Room>> {
183 self.room_transaction(room_id, |tx| async move {
184 room_participant::Entity::delete_many()
185 .filter(
186 room_participant::Column::RoomId
187 .eq(room_id)
188 .and(room_participant::Column::UserId.eq(called_user_id)),
189 )
190 .exec(&*tx)
191 .await?;
192 let room = self.get_room(room_id, &tx).await?;
193 Ok(room)
194 })
195 .await
196 }
197
198 pub async fn decline_call(
199 &self,
200 expected_room_id: Option<RoomId>,
201 user_id: UserId,
202 ) -> Result<Option<RoomGuard<proto::Room>>> {
203 self.optional_room_transaction(|tx| async move {
204 let mut filter = Condition::all()
205 .add(room_participant::Column::UserId.eq(user_id))
206 .add(room_participant::Column::AnsweringConnectionId.is_null());
207 if let Some(room_id) = expected_room_id {
208 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
209 }
210 let participant = room_participant::Entity::find()
211 .filter(filter)
212 .one(&*tx)
213 .await?;
214
215 let participant = if let Some(participant) = participant {
216 participant
217 } else if expected_room_id.is_some() {
218 return Err(anyhow!("could not find call to decline"))?;
219 } else {
220 return Ok(None);
221 };
222
223 let room_id = participant.room_id;
224 room_participant::Entity::delete(participant.into_active_model())
225 .exec(&*tx)
226 .await?;
227
228 let room = self.get_room(room_id, &tx).await?;
229 Ok(Some((room_id, room)))
230 })
231 .await
232 }
233
234 pub async fn cancel_call(
235 &self,
236 room_id: RoomId,
237 calling_connection: ConnectionId,
238 called_user_id: UserId,
239 ) -> Result<RoomGuard<proto::Room>> {
240 self.room_transaction(room_id, |tx| async move {
241 let participant = room_participant::Entity::find()
242 .filter(
243 Condition::all()
244 .add(room_participant::Column::UserId.eq(called_user_id))
245 .add(room_participant::Column::RoomId.eq(room_id))
246 .add(
247 room_participant::Column::CallingConnectionId
248 .eq(calling_connection.id as i32),
249 )
250 .add(
251 room_participant::Column::CallingConnectionServerId
252 .eq(calling_connection.owner_id as i32),
253 )
254 .add(room_participant::Column::AnsweringConnectionId.is_null()),
255 )
256 .one(&*tx)
257 .await?
258 .ok_or_else(|| anyhow!("no call to cancel"))?;
259
260 room_participant::Entity::delete(participant.into_active_model())
261 .exec(&*tx)
262 .await?;
263
264 let room = self.get_room(room_id, &tx).await?;
265 Ok(room)
266 })
267 .await
268 }
269
270 pub async fn join_room(
271 &self,
272 room_id: RoomId,
273 user_id: UserId,
274 connection: ConnectionId,
275 enviroment: &str,
276 ) -> Result<RoomGuard<JoinRoom>> {
277 self.room_transaction(room_id, |tx| async move {
278 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
279 enum QueryChannelIdAndEnviroment {
280 ChannelId,
281 Enviroment,
282 }
283
284 let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
285 room::Entity::find()
286 .select_only()
287 .column(room::Column::ChannelId)
288 .column(room::Column::Enviroment)
289 .filter(room::Column::Id.eq(room_id))
290 .into_values::<_, QueryChannelIdAndEnviroment>()
291 .one(&*tx)
292 .await?
293 .ok_or_else(|| anyhow!("no such room"))?;
294
295 if let Some(release_channel) = release_channel {
296 if &release_channel != enviroment {
297 Err(anyhow!("must join using the {} release", release_channel))?;
298 }
299 }
300
301 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
302 enum QueryParticipantIndices {
303 ParticipantIndex,
304 }
305 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
306 .filter(
307 room_participant::Column::RoomId
308 .eq(room_id)
309 .and(room_participant::Column::ParticipantIndex.is_not_null()),
310 )
311 .select_only()
312 .column(room_participant::Column::ParticipantIndex)
313 .into_values::<_, QueryParticipantIndices>()
314 .all(&*tx)
315 .await?;
316
317 let mut participant_index = 0;
318 while existing_participant_indices.contains(&participant_index) {
319 participant_index += 1;
320 }
321
322 if let Some(channel_id) = channel_id {
323 self.check_user_is_channel_member(channel_id, user_id, &*tx)
324 .await?;
325
326 room_participant::Entity::insert_many([room_participant::ActiveModel {
327 room_id: ActiveValue::set(room_id),
328 user_id: ActiveValue::set(user_id),
329 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
330 answering_connection_server_id: ActiveValue::set(Some(ServerId(
331 connection.owner_id as i32,
332 ))),
333 answering_connection_lost: ActiveValue::set(false),
334 calling_user_id: ActiveValue::set(user_id),
335 calling_connection_id: ActiveValue::set(connection.id as i32),
336 calling_connection_server_id: ActiveValue::set(Some(ServerId(
337 connection.owner_id as i32,
338 ))),
339 participant_index: ActiveValue::Set(Some(participant_index)),
340 ..Default::default()
341 }])
342 .on_conflict(
343 OnConflict::columns([room_participant::Column::UserId])
344 .update_columns([
345 room_participant::Column::AnsweringConnectionId,
346 room_participant::Column::AnsweringConnectionServerId,
347 room_participant::Column::AnsweringConnectionLost,
348 room_participant::Column::ParticipantIndex,
349 ])
350 .to_owned(),
351 )
352 .exec(&*tx)
353 .await?;
354 } else {
355 let result = room_participant::Entity::update_many()
356 .filter(
357 Condition::all()
358 .add(room_participant::Column::RoomId.eq(room_id))
359 .add(room_participant::Column::UserId.eq(user_id))
360 .add(room_participant::Column::AnsweringConnectionId.is_null()),
361 )
362 .set(room_participant::ActiveModel {
363 participant_index: ActiveValue::Set(Some(participant_index)),
364 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
365 answering_connection_server_id: ActiveValue::set(Some(ServerId(
366 connection.owner_id as i32,
367 ))),
368 answering_connection_lost: ActiveValue::set(false),
369 ..Default::default()
370 })
371 .exec(&*tx)
372 .await?;
373 if result.rows_affected == 0 {
374 Err(anyhow!("room does not exist or was already joined"))?;
375 }
376 }
377
378 let room = self.get_room(room_id, &tx).await?;
379 let channel_members = if let Some(channel_id) = channel_id {
380 self.get_channel_members_internal(channel_id, &tx).await?
381 } else {
382 Vec::new()
383 };
384 Ok(JoinRoom {
385 room,
386 channel_id,
387 channel_members,
388 })
389 })
390 .await
391 }
392
393 pub async fn rejoin_room(
394 &self,
395 rejoin_room: proto::RejoinRoom,
396 user_id: UserId,
397 connection: ConnectionId,
398 ) -> Result<RoomGuard<RejoinedRoom>> {
399 let room_id = RoomId::from_proto(rejoin_room.id);
400 self.room_transaction(room_id, |tx| async {
401 let tx = tx;
402 let participant_update = room_participant::Entity::update_many()
403 .filter(
404 Condition::all()
405 .add(room_participant::Column::RoomId.eq(room_id))
406 .add(room_participant::Column::UserId.eq(user_id))
407 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
408 .add(
409 Condition::any()
410 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
411 .add(
412 room_participant::Column::AnsweringConnectionServerId
413 .ne(connection.owner_id as i32),
414 ),
415 ),
416 )
417 .set(room_participant::ActiveModel {
418 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
419 answering_connection_server_id: ActiveValue::set(Some(ServerId(
420 connection.owner_id as i32,
421 ))),
422 answering_connection_lost: ActiveValue::set(false),
423 ..Default::default()
424 })
425 .exec(&*tx)
426 .await?;
427 if participant_update.rows_affected == 0 {
428 return Err(anyhow!("room does not exist or was already joined"))?;
429 }
430
431 let mut reshared_projects = Vec::new();
432 for reshared_project in &rejoin_room.reshared_projects {
433 let project_id = ProjectId::from_proto(reshared_project.project_id);
434 let project = project::Entity::find_by_id(project_id)
435 .one(&*tx)
436 .await?
437 .ok_or_else(|| anyhow!("project does not exist"))?;
438 if project.host_user_id != user_id {
439 return Err(anyhow!("no such project"))?;
440 }
441
442 let mut collaborators = project
443 .find_related(project_collaborator::Entity)
444 .all(&*tx)
445 .await?;
446 let host_ix = collaborators
447 .iter()
448 .position(|collaborator| {
449 collaborator.user_id == user_id && collaborator.is_host
450 })
451 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
452 let host = collaborators.swap_remove(host_ix);
453 let old_connection_id = host.connection();
454
455 project::Entity::update(project::ActiveModel {
456 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
457 host_connection_server_id: ActiveValue::set(Some(ServerId(
458 connection.owner_id as i32,
459 ))),
460 ..project.into_active_model()
461 })
462 .exec(&*tx)
463 .await?;
464 project_collaborator::Entity::update(project_collaborator::ActiveModel {
465 connection_id: ActiveValue::set(connection.id as i32),
466 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
467 ..host.into_active_model()
468 })
469 .exec(&*tx)
470 .await?;
471
472 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
473 .await?;
474
475 reshared_projects.push(ResharedProject {
476 id: project_id,
477 old_connection_id,
478 collaborators: collaborators
479 .iter()
480 .map(|collaborator| ProjectCollaborator {
481 connection_id: collaborator.connection(),
482 user_id: collaborator.user_id,
483 replica_id: collaborator.replica_id,
484 is_host: collaborator.is_host,
485 })
486 .collect(),
487 worktrees: reshared_project.worktrees.clone(),
488 });
489 }
490
491 project::Entity::delete_many()
492 .filter(
493 Condition::all()
494 .add(project::Column::RoomId.eq(room_id))
495 .add(project::Column::HostUserId.eq(user_id))
496 .add(
497 project::Column::Id
498 .is_not_in(reshared_projects.iter().map(|project| project.id)),
499 ),
500 )
501 .exec(&*tx)
502 .await?;
503
504 let mut rejoined_projects = Vec::new();
505 for rejoined_project in &rejoin_room.rejoined_projects {
506 let project_id = ProjectId::from_proto(rejoined_project.id);
507 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
508 continue;
509 };
510
511 let mut worktrees = Vec::new();
512 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
513 for db_worktree in db_worktrees {
514 let mut worktree = RejoinedWorktree {
515 id: db_worktree.id as u64,
516 abs_path: db_worktree.abs_path,
517 root_name: db_worktree.root_name,
518 visible: db_worktree.visible,
519 updated_entries: Default::default(),
520 removed_entries: Default::default(),
521 updated_repositories: Default::default(),
522 removed_repositories: Default::default(),
523 diagnostic_summaries: Default::default(),
524 settings_files: Default::default(),
525 scan_id: db_worktree.scan_id as u64,
526 completed_scan_id: db_worktree.completed_scan_id as u64,
527 };
528
529 let rejoined_worktree = rejoined_project
530 .worktrees
531 .iter()
532 .find(|worktree| worktree.id == db_worktree.id as u64);
533
534 // File entries
535 {
536 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
537 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
538 } else {
539 worktree_entry::Column::IsDeleted.eq(false)
540 };
541
542 let mut db_entries = worktree_entry::Entity::find()
543 .filter(
544 Condition::all()
545 .add(worktree_entry::Column::ProjectId.eq(project.id))
546 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
547 .add(entry_filter),
548 )
549 .stream(&*tx)
550 .await?;
551
552 while let Some(db_entry) = db_entries.next().await {
553 let db_entry = db_entry?;
554 if db_entry.is_deleted {
555 worktree.removed_entries.push(db_entry.id as u64);
556 } else {
557 worktree.updated_entries.push(proto::Entry {
558 id: db_entry.id as u64,
559 is_dir: db_entry.is_dir,
560 path: db_entry.path,
561 inode: db_entry.inode as u64,
562 mtime: Some(proto::Timestamp {
563 seconds: db_entry.mtime_seconds as u64,
564 nanos: db_entry.mtime_nanos as u32,
565 }),
566 is_symlink: db_entry.is_symlink,
567 is_ignored: db_entry.is_ignored,
568 is_external: db_entry.is_external,
569 git_status: db_entry.git_status.map(|status| status as i32),
570 });
571 }
572 }
573 }
574
575 // Repository Entries
576 {
577 let repository_entry_filter =
578 if let Some(rejoined_worktree) = rejoined_worktree {
579 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
580 } else {
581 worktree_repository::Column::IsDeleted.eq(false)
582 };
583
584 let mut db_repositories = worktree_repository::Entity::find()
585 .filter(
586 Condition::all()
587 .add(worktree_repository::Column::ProjectId.eq(project.id))
588 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
589 .add(repository_entry_filter),
590 )
591 .stream(&*tx)
592 .await?;
593
594 while let Some(db_repository) = db_repositories.next().await {
595 let db_repository = db_repository?;
596 if db_repository.is_deleted {
597 worktree
598 .removed_repositories
599 .push(db_repository.work_directory_id as u64);
600 } else {
601 worktree.updated_repositories.push(proto::RepositoryEntry {
602 work_directory_id: db_repository.work_directory_id as u64,
603 branch: db_repository.branch,
604 });
605 }
606 }
607 }
608
609 worktrees.push(worktree);
610 }
611
612 let language_servers = project
613 .find_related(language_server::Entity)
614 .all(&*tx)
615 .await?
616 .into_iter()
617 .map(|language_server| proto::LanguageServer {
618 id: language_server.id as u64,
619 name: language_server.name,
620 })
621 .collect::<Vec<_>>();
622
623 {
624 let mut db_settings_files = worktree_settings_file::Entity::find()
625 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
626 .stream(&*tx)
627 .await?;
628 while let Some(db_settings_file) = db_settings_files.next().await {
629 let db_settings_file = db_settings_file?;
630 if let Some(worktree) = worktrees
631 .iter_mut()
632 .find(|w| w.id == db_settings_file.worktree_id as u64)
633 {
634 worktree.settings_files.push(WorktreeSettingsFile {
635 path: db_settings_file.path,
636 content: db_settings_file.content,
637 });
638 }
639 }
640 }
641
642 let mut collaborators = project
643 .find_related(project_collaborator::Entity)
644 .all(&*tx)
645 .await?;
646 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
647 .iter()
648 .position(|collaborator| collaborator.user_id == user_id)
649 {
650 collaborators.swap_remove(self_collaborator_ix)
651 } else {
652 continue;
653 };
654 let old_connection_id = self_collaborator.connection();
655 project_collaborator::Entity::update(project_collaborator::ActiveModel {
656 connection_id: ActiveValue::set(connection.id as i32),
657 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
658 ..self_collaborator.into_active_model()
659 })
660 .exec(&*tx)
661 .await?;
662
663 let collaborators = collaborators
664 .into_iter()
665 .map(|collaborator| ProjectCollaborator {
666 connection_id: collaborator.connection(),
667 user_id: collaborator.user_id,
668 replica_id: collaborator.replica_id,
669 is_host: collaborator.is_host,
670 })
671 .collect::<Vec<_>>();
672
673 rejoined_projects.push(RejoinedProject {
674 id: project_id,
675 old_connection_id,
676 collaborators,
677 worktrees,
678 language_servers,
679 });
680 }
681
682 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
683 let channel_members = if let Some(channel_id) = channel_id {
684 self.get_channel_members_internal(channel_id, &tx).await?
685 } else {
686 Vec::new()
687 };
688
689 Ok(RejoinedRoom {
690 room,
691 channel_id,
692 channel_members,
693 rejoined_projects,
694 reshared_projects,
695 })
696 })
697 .await
698 }
699
700 pub async fn leave_room(
701 &self,
702 connection: ConnectionId,
703 ) -> Result<Option<RoomGuard<LeftRoom>>> {
704 self.optional_room_transaction(|tx| async move {
705 let leaving_participant = room_participant::Entity::find()
706 .filter(
707 Condition::all()
708 .add(
709 room_participant::Column::AnsweringConnectionId
710 .eq(connection.id as i32),
711 )
712 .add(
713 room_participant::Column::AnsweringConnectionServerId
714 .eq(connection.owner_id as i32),
715 ),
716 )
717 .one(&*tx)
718 .await?;
719
720 if let Some(leaving_participant) = leaving_participant {
721 // Leave room.
722 let room_id = leaving_participant.room_id;
723 room_participant::Entity::delete_by_id(leaving_participant.id)
724 .exec(&*tx)
725 .await?;
726
727 // Cancel pending calls initiated by the leaving user.
728 let called_participants = room_participant::Entity::find()
729 .filter(
730 Condition::all()
731 .add(
732 room_participant::Column::CallingUserId
733 .eq(leaving_participant.user_id),
734 )
735 .add(room_participant::Column::AnsweringConnectionId.is_null()),
736 )
737 .all(&*tx)
738 .await?;
739 room_participant::Entity::delete_many()
740 .filter(
741 room_participant::Column::Id
742 .is_in(called_participants.iter().map(|participant| participant.id)),
743 )
744 .exec(&*tx)
745 .await?;
746 let canceled_calls_to_user_ids = called_participants
747 .into_iter()
748 .map(|participant| participant.user_id)
749 .collect();
750
751 // Detect left projects.
752 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
753 enum QueryProjectIds {
754 ProjectId,
755 }
756 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
757 .select_only()
758 .column_as(
759 project_collaborator::Column::ProjectId,
760 QueryProjectIds::ProjectId,
761 )
762 .filter(
763 Condition::all()
764 .add(
765 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
766 )
767 .add(
768 project_collaborator::Column::ConnectionServerId
769 .eq(connection.owner_id as i32),
770 ),
771 )
772 .into_values::<_, QueryProjectIds>()
773 .all(&*tx)
774 .await?;
775 let mut left_projects = HashMap::default();
776 let mut collaborators = project_collaborator::Entity::find()
777 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
778 .stream(&*tx)
779 .await?;
780 while let Some(collaborator) = collaborators.next().await {
781 let collaborator = collaborator?;
782 let left_project =
783 left_projects
784 .entry(collaborator.project_id)
785 .or_insert(LeftProject {
786 id: collaborator.project_id,
787 host_user_id: Default::default(),
788 connection_ids: Default::default(),
789 host_connection_id: Default::default(),
790 });
791
792 let collaborator_connection_id = collaborator.connection();
793 if collaborator_connection_id != connection {
794 left_project.connection_ids.push(collaborator_connection_id);
795 }
796
797 if collaborator.is_host {
798 left_project.host_user_id = collaborator.user_id;
799 left_project.host_connection_id = collaborator_connection_id;
800 }
801 }
802 drop(collaborators);
803
804 // Leave projects.
805 project_collaborator::Entity::delete_many()
806 .filter(
807 Condition::all()
808 .add(
809 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
810 )
811 .add(
812 project_collaborator::Column::ConnectionServerId
813 .eq(connection.owner_id as i32),
814 ),
815 )
816 .exec(&*tx)
817 .await?;
818
819 // Unshare projects.
820 project::Entity::delete_many()
821 .filter(
822 Condition::all()
823 .add(project::Column::RoomId.eq(room_id))
824 .add(project::Column::HostConnectionId.eq(connection.id as i32))
825 .add(
826 project::Column::HostConnectionServerId
827 .eq(connection.owner_id as i32),
828 ),
829 )
830 .exec(&*tx)
831 .await?;
832
833 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
834 let deleted = if room.participants.is_empty() {
835 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
836 result.rows_affected > 0
837 } else {
838 false
839 };
840
841 let channel_members = if let Some(channel_id) = channel_id {
842 self.get_channel_members_internal(channel_id, &tx).await?
843 } else {
844 Vec::new()
845 };
846 let left_room = LeftRoom {
847 room,
848 channel_id,
849 channel_members,
850 left_projects,
851 canceled_calls_to_user_ids,
852 deleted,
853 };
854
855 if left_room.room.participants.is_empty() {
856 self.rooms.remove(&room_id);
857 }
858
859 Ok(Some((room_id, left_room)))
860 } else {
861 Ok(None)
862 }
863 })
864 .await
865 }
866
867 pub async fn update_room_participant_location(
868 &self,
869 room_id: RoomId,
870 connection: ConnectionId,
871 location: proto::ParticipantLocation,
872 ) -> Result<RoomGuard<proto::Room>> {
873 self.room_transaction(room_id, |tx| async {
874 let tx = tx;
875 let location_kind;
876 let location_project_id;
877 match location
878 .variant
879 .as_ref()
880 .ok_or_else(|| anyhow!("invalid location"))?
881 {
882 proto::participant_location::Variant::SharedProject(project) => {
883 location_kind = 0;
884 location_project_id = Some(ProjectId::from_proto(project.id));
885 }
886 proto::participant_location::Variant::UnsharedProject(_) => {
887 location_kind = 1;
888 location_project_id = None;
889 }
890 proto::participant_location::Variant::External(_) => {
891 location_kind = 2;
892 location_project_id = None;
893 }
894 }
895
896 let result = room_participant::Entity::update_many()
897 .filter(
898 Condition::all()
899 .add(room_participant::Column::RoomId.eq(room_id))
900 .add(
901 room_participant::Column::AnsweringConnectionId
902 .eq(connection.id as i32),
903 )
904 .add(
905 room_participant::Column::AnsweringConnectionServerId
906 .eq(connection.owner_id as i32),
907 ),
908 )
909 .set(room_participant::ActiveModel {
910 location_kind: ActiveValue::set(Some(location_kind)),
911 location_project_id: ActiveValue::set(location_project_id),
912 ..Default::default()
913 })
914 .exec(&*tx)
915 .await?;
916
917 if result.rows_affected == 1 {
918 let room = self.get_room(room_id, &tx).await?;
919 Ok(room)
920 } else {
921 Err(anyhow!("could not update room participant location"))?
922 }
923 })
924 .await
925 }
926
927 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
928 self.transaction(|tx| async move {
929 self.room_connection_lost(connection, &*tx).await?;
930 self.channel_buffer_connection_lost(connection, &*tx)
931 .await?;
932 self.channel_chat_connection_lost(connection, &*tx).await?;
933 Ok(())
934 })
935 .await
936 }
937
938 pub async fn room_connection_lost(
939 &self,
940 connection: ConnectionId,
941 tx: &DatabaseTransaction,
942 ) -> Result<()> {
943 let participant = room_participant::Entity::find()
944 .filter(
945 Condition::all()
946 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
947 .add(
948 room_participant::Column::AnsweringConnectionServerId
949 .eq(connection.owner_id as i32),
950 ),
951 )
952 .one(&*tx)
953 .await?;
954
955 if let Some(participant) = participant {
956 room_participant::Entity::update(room_participant::ActiveModel {
957 answering_connection_lost: ActiveValue::set(true),
958 ..participant.into_active_model()
959 })
960 .exec(&*tx)
961 .await?;
962 }
963 Ok(())
964 }
965
966 fn build_incoming_call(
967 room: &proto::Room,
968 called_user_id: UserId,
969 ) -> Option<proto::IncomingCall> {
970 let pending_participant = room
971 .pending_participants
972 .iter()
973 .find(|participant| participant.user_id == called_user_id.to_proto())?;
974
975 Some(proto::IncomingCall {
976 room_id: room.id,
977 calling_user_id: pending_participant.calling_user_id,
978 participant_user_ids: room
979 .participants
980 .iter()
981 .map(|participant| participant.user_id)
982 .collect(),
983 initial_project: room.participants.iter().find_map(|participant| {
984 let initial_project_id = pending_participant.initial_project_id?;
985 participant
986 .projects
987 .iter()
988 .find(|project| project.id == initial_project_id)
989 .cloned()
990 }),
991 })
992 }
993
994 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
995 let (_, room) = self.get_channel_room(room_id, tx).await?;
996 Ok(room)
997 }
998
999 pub async fn room_connection_ids(
1000 &self,
1001 room_id: RoomId,
1002 connection_id: ConnectionId,
1003 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1004 self.room_transaction(room_id, |tx| async move {
1005 let mut participants = room_participant::Entity::find()
1006 .filter(room_participant::Column::RoomId.eq(room_id))
1007 .stream(&*tx)
1008 .await?;
1009
1010 let mut is_participant = false;
1011 let mut connection_ids = HashSet::default();
1012 while let Some(participant) = participants.next().await {
1013 let participant = participant?;
1014 if let Some(answering_connection) = participant.answering_connection() {
1015 if answering_connection == connection_id {
1016 is_participant = true;
1017 } else {
1018 connection_ids.insert(answering_connection);
1019 }
1020 }
1021 }
1022
1023 if !is_participant {
1024 Err(anyhow!("not a room participant"))?;
1025 }
1026
1027 Ok(connection_ids)
1028 })
1029 .await
1030 }
1031
1032 async fn get_channel_room(
1033 &self,
1034 room_id: RoomId,
1035 tx: &DatabaseTransaction,
1036 ) -> Result<(Option<ChannelId>, proto::Room)> {
1037 let db_room = room::Entity::find_by_id(room_id)
1038 .one(tx)
1039 .await?
1040 .ok_or_else(|| anyhow!("could not find room"))?;
1041
1042 let mut db_participants = db_room
1043 .find_related(room_participant::Entity)
1044 .stream(tx)
1045 .await?;
1046 let mut participants = HashMap::default();
1047 let mut pending_participants = Vec::new();
1048 while let Some(db_participant) = db_participants.next().await {
1049 let db_participant = db_participant?;
1050 if let (
1051 Some(answering_connection_id),
1052 Some(answering_connection_server_id),
1053 Some(participant_index),
1054 ) = (
1055 db_participant.answering_connection_id,
1056 db_participant.answering_connection_server_id,
1057 db_participant.participant_index,
1058 ) {
1059 let location = match (
1060 db_participant.location_kind,
1061 db_participant.location_project_id,
1062 ) {
1063 (Some(0), Some(project_id)) => {
1064 Some(proto::participant_location::Variant::SharedProject(
1065 proto::participant_location::SharedProject {
1066 id: project_id.to_proto(),
1067 },
1068 ))
1069 }
1070 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1071 Default::default(),
1072 )),
1073 _ => Some(proto::participant_location::Variant::External(
1074 Default::default(),
1075 )),
1076 };
1077
1078 let answering_connection = ConnectionId {
1079 owner_id: answering_connection_server_id.0 as u32,
1080 id: answering_connection_id as u32,
1081 };
1082 participants.insert(
1083 answering_connection,
1084 proto::Participant {
1085 user_id: db_participant.user_id.to_proto(),
1086 peer_id: Some(answering_connection.into()),
1087 projects: Default::default(),
1088 location: Some(proto::ParticipantLocation { variant: location }),
1089 participant_index: participant_index as u32,
1090 },
1091 );
1092 } else {
1093 pending_participants.push(proto::PendingParticipant {
1094 user_id: db_participant.user_id.to_proto(),
1095 calling_user_id: db_participant.calling_user_id.to_proto(),
1096 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1097 });
1098 }
1099 }
1100 drop(db_participants);
1101
1102 let mut db_projects = db_room
1103 .find_related(project::Entity)
1104 .find_with_related(worktree::Entity)
1105 .stream(tx)
1106 .await?;
1107
1108 while let Some(row) = db_projects.next().await {
1109 let (db_project, db_worktree) = row?;
1110 let host_connection = db_project.host_connection()?;
1111 if let Some(participant) = participants.get_mut(&host_connection) {
1112 let project = if let Some(project) = participant
1113 .projects
1114 .iter_mut()
1115 .find(|project| project.id == db_project.id.to_proto())
1116 {
1117 project
1118 } else {
1119 participant.projects.push(proto::ParticipantProject {
1120 id: db_project.id.to_proto(),
1121 worktree_root_names: Default::default(),
1122 });
1123 participant.projects.last_mut().unwrap()
1124 };
1125
1126 if let Some(db_worktree) = db_worktree {
1127 if db_worktree.visible {
1128 project.worktree_root_names.push(db_worktree.root_name);
1129 }
1130 }
1131 }
1132 }
1133 drop(db_projects);
1134
1135 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1136 let mut followers = Vec::new();
1137 while let Some(db_follower) = db_followers.next().await {
1138 let db_follower = db_follower?;
1139 followers.push(proto::Follower {
1140 leader_id: Some(db_follower.leader_connection().into()),
1141 follower_id: Some(db_follower.follower_connection().into()),
1142 project_id: db_follower.project_id.to_proto(),
1143 });
1144 }
1145
1146 Ok((
1147 db_room.channel_id,
1148 proto::Room {
1149 id: db_room.id.to_proto(),
1150 live_kit_room: db_room.live_kit_room,
1151 participants: participants.into_values().collect(),
1152 pending_participants,
1153 followers,
1154 },
1155 ))
1156 }
1157}