1use super::*;
2
3impl Database {
4 pub async fn clear_stale_room_participants(
5 &self,
6 room_id: RoomId,
7 new_server_id: ServerId,
8 ) -> Result<RoomGuard<RefreshedRoom>> {
9 self.room_transaction(room_id, |tx| async move {
10 let stale_participant_filter = Condition::all()
11 .add(room_participant::Column::RoomId.eq(room_id))
12 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
13 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
14
15 let stale_participant_user_ids = room_participant::Entity::find()
16 .filter(stale_participant_filter.clone())
17 .all(&*tx)
18 .await?
19 .into_iter()
20 .map(|participant| participant.user_id)
21 .collect::<Vec<_>>();
22
23 // Delete participants who failed to reconnect and cancel their calls.
24 let mut canceled_calls_to_user_ids = Vec::new();
25 room_participant::Entity::delete_many()
26 .filter(stale_participant_filter)
27 .exec(&*tx)
28 .await?;
29 let called_participants = room_participant::Entity::find()
30 .filter(
31 Condition::all()
32 .add(
33 room_participant::Column::CallingUserId
34 .is_in(stale_participant_user_ids.iter().copied()),
35 )
36 .add(room_participant::Column::AnsweringConnectionId.is_null()),
37 )
38 .all(&*tx)
39 .await?;
40 room_participant::Entity::delete_many()
41 .filter(
42 room_participant::Column::Id
43 .is_in(called_participants.iter().map(|participant| participant.id)),
44 )
45 .exec(&*tx)
46 .await?;
47 canceled_calls_to_user_ids.extend(
48 called_participants
49 .into_iter()
50 .map(|participant| participant.user_id),
51 );
52
53 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
54 let channel_members;
55 if let Some(channel_id) = channel_id {
56 channel_members = self.get_channel_members_internal(channel_id, &tx).await?;
57 } else {
58 channel_members = Vec::new();
59
60 // Delete the room if it becomes empty.
61 if room.participants.is_empty() {
62 project::Entity::delete_many()
63 .filter(project::Column::RoomId.eq(room_id))
64 .exec(&*tx)
65 .await?;
66 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
67 }
68 };
69
70 Ok(RefreshedRoom {
71 room,
72 channel_id,
73 channel_members,
74 stale_participant_user_ids,
75 canceled_calls_to_user_ids,
76 })
77 })
78 .await
79 }
80
81 pub async fn incoming_call_for_user(
82 &self,
83 user_id: UserId,
84 ) -> Result<Option<proto::IncomingCall>> {
85 self.transaction(|tx| async move {
86 let pending_participant = room_participant::Entity::find()
87 .filter(
88 room_participant::Column::UserId
89 .eq(user_id)
90 .and(room_participant::Column::AnsweringConnectionId.is_null()),
91 )
92 .one(&*tx)
93 .await?;
94
95 if let Some(pending_participant) = pending_participant {
96 let room = self.get_room(pending_participant.room_id, &tx).await?;
97 Ok(Self::build_incoming_call(&room, user_id))
98 } else {
99 Ok(None)
100 }
101 })
102 .await
103 }
104
105 pub async fn create_room(
106 &self,
107 user_id: UserId,
108 connection: ConnectionId,
109 live_kit_room: &str,
110 ) -> Result<proto::Room> {
111 self.transaction(|tx| async move {
112 let room = room::ActiveModel {
113 live_kit_room: ActiveValue::set(live_kit_room.into()),
114 ..Default::default()
115 }
116 .insert(&*tx)
117 .await?;
118 room_participant::ActiveModel {
119 room_id: ActiveValue::set(room.id),
120 user_id: ActiveValue::set(user_id),
121 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
122 answering_connection_server_id: ActiveValue::set(Some(ServerId(
123 connection.owner_id as i32,
124 ))),
125 answering_connection_lost: ActiveValue::set(false),
126 calling_user_id: ActiveValue::set(user_id),
127 calling_connection_id: ActiveValue::set(connection.id as i32),
128 calling_connection_server_id: ActiveValue::set(Some(ServerId(
129 connection.owner_id as i32,
130 ))),
131 ..Default::default()
132 }
133 .insert(&*tx)
134 .await?;
135
136 let room = self.get_room(room.id, &tx).await?;
137 Ok(room)
138 })
139 .await
140 }
141
142 pub async fn call(
143 &self,
144 room_id: RoomId,
145 calling_user_id: UserId,
146 calling_connection: ConnectionId,
147 called_user_id: UserId,
148 initial_project_id: Option<ProjectId>,
149 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
150 self.room_transaction(room_id, |tx| async move {
151 room_participant::ActiveModel {
152 room_id: ActiveValue::set(room_id),
153 user_id: ActiveValue::set(called_user_id),
154 answering_connection_lost: ActiveValue::set(false),
155 calling_user_id: ActiveValue::set(calling_user_id),
156 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
157 calling_connection_server_id: ActiveValue::set(Some(ServerId(
158 calling_connection.owner_id as i32,
159 ))),
160 initial_project_id: ActiveValue::set(initial_project_id),
161 ..Default::default()
162 }
163 .insert(&*tx)
164 .await?;
165
166 let room = self.get_room(room_id, &tx).await?;
167 let incoming_call = Self::build_incoming_call(&room, called_user_id)
168 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
169 Ok((room, incoming_call))
170 })
171 .await
172 }
173
174 pub async fn call_failed(
175 &self,
176 room_id: RoomId,
177 called_user_id: UserId,
178 ) -> Result<RoomGuard<proto::Room>> {
179 self.room_transaction(room_id, |tx| async move {
180 room_participant::Entity::delete_many()
181 .filter(
182 room_participant::Column::RoomId
183 .eq(room_id)
184 .and(room_participant::Column::UserId.eq(called_user_id)),
185 )
186 .exec(&*tx)
187 .await?;
188 let room = self.get_room(room_id, &tx).await?;
189 Ok(room)
190 })
191 .await
192 }
193
194 pub async fn decline_call(
195 &self,
196 expected_room_id: Option<RoomId>,
197 user_id: UserId,
198 ) -> Result<Option<RoomGuard<proto::Room>>> {
199 self.optional_room_transaction(|tx| async move {
200 let mut filter = Condition::all()
201 .add(room_participant::Column::UserId.eq(user_id))
202 .add(room_participant::Column::AnsweringConnectionId.is_null());
203 if let Some(room_id) = expected_room_id {
204 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
205 }
206 let participant = room_participant::Entity::find()
207 .filter(filter)
208 .one(&*tx)
209 .await?;
210
211 let participant = if let Some(participant) = participant {
212 participant
213 } else if expected_room_id.is_some() {
214 return Err(anyhow!("could not find call to decline"))?;
215 } else {
216 return Ok(None);
217 };
218
219 let room_id = participant.room_id;
220 room_participant::Entity::delete(participant.into_active_model())
221 .exec(&*tx)
222 .await?;
223
224 let room = self.get_room(room_id, &tx).await?;
225 Ok(Some((room_id, room)))
226 })
227 .await
228 }
229
230 pub async fn cancel_call(
231 &self,
232 room_id: RoomId,
233 calling_connection: ConnectionId,
234 called_user_id: UserId,
235 ) -> Result<RoomGuard<proto::Room>> {
236 self.room_transaction(room_id, |tx| async move {
237 let participant = room_participant::Entity::find()
238 .filter(
239 Condition::all()
240 .add(room_participant::Column::UserId.eq(called_user_id))
241 .add(room_participant::Column::RoomId.eq(room_id))
242 .add(
243 room_participant::Column::CallingConnectionId
244 .eq(calling_connection.id as i32),
245 )
246 .add(
247 room_participant::Column::CallingConnectionServerId
248 .eq(calling_connection.owner_id as i32),
249 )
250 .add(room_participant::Column::AnsweringConnectionId.is_null()),
251 )
252 .one(&*tx)
253 .await?
254 .ok_or_else(|| anyhow!("no call to cancel"))?;
255
256 room_participant::Entity::delete(participant.into_active_model())
257 .exec(&*tx)
258 .await?;
259
260 let room = self.get_room(room_id, &tx).await?;
261 Ok(room)
262 })
263 .await
264 }
265
266 pub async fn join_room(
267 &self,
268 room_id: RoomId,
269 user_id: UserId,
270 connection: ConnectionId,
271 ) -> Result<RoomGuard<JoinRoom>> {
272 self.room_transaction(room_id, |tx| async move {
273 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
274 enum QueryChannelId {
275 ChannelId,
276 }
277 let channel_id: Option<ChannelId> = room::Entity::find()
278 .select_only()
279 .column(room::Column::ChannelId)
280 .filter(room::Column::Id.eq(room_id))
281 .into_values::<_, QueryChannelId>()
282 .one(&*tx)
283 .await?
284 .ok_or_else(|| anyhow!("no such room"))?;
285
286 if let Some(channel_id) = channel_id {
287 self.check_user_is_channel_member(channel_id, user_id, &*tx)
288 .await?;
289
290 room_participant::Entity::insert_many([room_participant::ActiveModel {
291 room_id: ActiveValue::set(room_id),
292 user_id: ActiveValue::set(user_id),
293 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
294 answering_connection_server_id: ActiveValue::set(Some(ServerId(
295 connection.owner_id as i32,
296 ))),
297 answering_connection_lost: ActiveValue::set(false),
298 calling_user_id: ActiveValue::set(user_id),
299 calling_connection_id: ActiveValue::set(connection.id as i32),
300 calling_connection_server_id: ActiveValue::set(Some(ServerId(
301 connection.owner_id as i32,
302 ))),
303 ..Default::default()
304 }])
305 .on_conflict(
306 OnConflict::columns([room_participant::Column::UserId])
307 .update_columns([
308 room_participant::Column::AnsweringConnectionId,
309 room_participant::Column::AnsweringConnectionServerId,
310 room_participant::Column::AnsweringConnectionLost,
311 ])
312 .to_owned(),
313 )
314 .exec(&*tx)
315 .await?;
316 } else {
317 let result = room_participant::Entity::update_many()
318 .filter(
319 Condition::all()
320 .add(room_participant::Column::RoomId.eq(room_id))
321 .add(room_participant::Column::UserId.eq(user_id))
322 .add(room_participant::Column::AnsweringConnectionId.is_null()),
323 )
324 .set(room_participant::ActiveModel {
325 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
326 answering_connection_server_id: ActiveValue::set(Some(ServerId(
327 connection.owner_id as i32,
328 ))),
329 answering_connection_lost: ActiveValue::set(false),
330 ..Default::default()
331 })
332 .exec(&*tx)
333 .await?;
334 if result.rows_affected == 0 {
335 Err(anyhow!("room does not exist or was already joined"))?;
336 }
337 }
338
339 let room = self.get_room(room_id, &tx).await?;
340 let channel_members = if let Some(channel_id) = channel_id {
341 self.get_channel_members_internal(channel_id, &tx).await?
342 } else {
343 Vec::new()
344 };
345 Ok(JoinRoom {
346 room,
347 channel_id,
348 channel_members,
349 })
350 })
351 .await
352 }
353
354 pub async fn rejoin_room(
355 &self,
356 rejoin_room: proto::RejoinRoom,
357 user_id: UserId,
358 connection: ConnectionId,
359 ) -> Result<RoomGuard<RejoinedRoom>> {
360 let room_id = RoomId::from_proto(rejoin_room.id);
361 self.room_transaction(room_id, |tx| async {
362 let tx = tx;
363 let participant_update = room_participant::Entity::update_many()
364 .filter(
365 Condition::all()
366 .add(room_participant::Column::RoomId.eq(room_id))
367 .add(room_participant::Column::UserId.eq(user_id))
368 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
369 .add(
370 Condition::any()
371 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
372 .add(
373 room_participant::Column::AnsweringConnectionServerId
374 .ne(connection.owner_id as i32),
375 ),
376 ),
377 )
378 .set(room_participant::ActiveModel {
379 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
380 answering_connection_server_id: ActiveValue::set(Some(ServerId(
381 connection.owner_id as i32,
382 ))),
383 answering_connection_lost: ActiveValue::set(false),
384 ..Default::default()
385 })
386 .exec(&*tx)
387 .await?;
388 if participant_update.rows_affected == 0 {
389 return Err(anyhow!("room does not exist or was already joined"))?;
390 }
391
392 let mut reshared_projects = Vec::new();
393 for reshared_project in &rejoin_room.reshared_projects {
394 let project_id = ProjectId::from_proto(reshared_project.project_id);
395 let project = project::Entity::find_by_id(project_id)
396 .one(&*tx)
397 .await?
398 .ok_or_else(|| anyhow!("project does not exist"))?;
399 if project.host_user_id != user_id {
400 return Err(anyhow!("no such project"))?;
401 }
402
403 let mut collaborators = project
404 .find_related(project_collaborator::Entity)
405 .all(&*tx)
406 .await?;
407 let host_ix = collaborators
408 .iter()
409 .position(|collaborator| {
410 collaborator.user_id == user_id && collaborator.is_host
411 })
412 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
413 let host = collaborators.swap_remove(host_ix);
414 let old_connection_id = host.connection();
415
416 project::Entity::update(project::ActiveModel {
417 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
418 host_connection_server_id: ActiveValue::set(Some(ServerId(
419 connection.owner_id as i32,
420 ))),
421 ..project.into_active_model()
422 })
423 .exec(&*tx)
424 .await?;
425 project_collaborator::Entity::update(project_collaborator::ActiveModel {
426 connection_id: ActiveValue::set(connection.id as i32),
427 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
428 ..host.into_active_model()
429 })
430 .exec(&*tx)
431 .await?;
432
433 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
434 .await?;
435
436 reshared_projects.push(ResharedProject {
437 id: project_id,
438 old_connection_id,
439 collaborators: collaborators
440 .iter()
441 .map(|collaborator| ProjectCollaborator {
442 connection_id: collaborator.connection(),
443 user_id: collaborator.user_id,
444 replica_id: collaborator.replica_id,
445 is_host: collaborator.is_host,
446 })
447 .collect(),
448 worktrees: reshared_project.worktrees.clone(),
449 });
450 }
451
452 project::Entity::delete_many()
453 .filter(
454 Condition::all()
455 .add(project::Column::RoomId.eq(room_id))
456 .add(project::Column::HostUserId.eq(user_id))
457 .add(
458 project::Column::Id
459 .is_not_in(reshared_projects.iter().map(|project| project.id)),
460 ),
461 )
462 .exec(&*tx)
463 .await?;
464
465 let mut rejoined_projects = Vec::new();
466 for rejoined_project in &rejoin_room.rejoined_projects {
467 let project_id = ProjectId::from_proto(rejoined_project.id);
468 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
469 continue;
470 };
471
472 let mut worktrees = Vec::new();
473 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
474 for db_worktree in db_worktrees {
475 let mut worktree = RejoinedWorktree {
476 id: db_worktree.id as u64,
477 abs_path: db_worktree.abs_path,
478 root_name: db_worktree.root_name,
479 visible: db_worktree.visible,
480 updated_entries: Default::default(),
481 removed_entries: Default::default(),
482 updated_repositories: Default::default(),
483 removed_repositories: Default::default(),
484 diagnostic_summaries: Default::default(),
485 settings_files: Default::default(),
486 scan_id: db_worktree.scan_id as u64,
487 completed_scan_id: db_worktree.completed_scan_id as u64,
488 };
489
490 let rejoined_worktree = rejoined_project
491 .worktrees
492 .iter()
493 .find(|worktree| worktree.id == db_worktree.id as u64);
494
495 // File entries
496 {
497 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
498 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
499 } else {
500 worktree_entry::Column::IsDeleted.eq(false)
501 };
502
503 let mut db_entries = worktree_entry::Entity::find()
504 .filter(
505 Condition::all()
506 .add(worktree_entry::Column::ProjectId.eq(project.id))
507 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
508 .add(entry_filter),
509 )
510 .stream(&*tx)
511 .await?;
512
513 while let Some(db_entry) = db_entries.next().await {
514 let db_entry = db_entry?;
515 if db_entry.is_deleted {
516 worktree.removed_entries.push(db_entry.id as u64);
517 } else {
518 worktree.updated_entries.push(proto::Entry {
519 id: db_entry.id as u64,
520 is_dir: db_entry.is_dir,
521 path: db_entry.path,
522 inode: db_entry.inode as u64,
523 mtime: Some(proto::Timestamp {
524 seconds: db_entry.mtime_seconds as u64,
525 nanos: db_entry.mtime_nanos as u32,
526 }),
527 is_symlink: db_entry.is_symlink,
528 is_ignored: db_entry.is_ignored,
529 is_external: db_entry.is_external,
530 git_status: db_entry.git_status.map(|status| status as i32),
531 });
532 }
533 }
534 }
535
536 // Repository Entries
537 {
538 let repository_entry_filter =
539 if let Some(rejoined_worktree) = rejoined_worktree {
540 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
541 } else {
542 worktree_repository::Column::IsDeleted.eq(false)
543 };
544
545 let mut db_repositories = worktree_repository::Entity::find()
546 .filter(
547 Condition::all()
548 .add(worktree_repository::Column::ProjectId.eq(project.id))
549 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
550 .add(repository_entry_filter),
551 )
552 .stream(&*tx)
553 .await?;
554
555 while let Some(db_repository) = db_repositories.next().await {
556 let db_repository = db_repository?;
557 if db_repository.is_deleted {
558 worktree
559 .removed_repositories
560 .push(db_repository.work_directory_id as u64);
561 } else {
562 worktree.updated_repositories.push(proto::RepositoryEntry {
563 work_directory_id: db_repository.work_directory_id as u64,
564 branch: db_repository.branch,
565 });
566 }
567 }
568 }
569
570 worktrees.push(worktree);
571 }
572
573 let language_servers = project
574 .find_related(language_server::Entity)
575 .all(&*tx)
576 .await?
577 .into_iter()
578 .map(|language_server| proto::LanguageServer {
579 id: language_server.id as u64,
580 name: language_server.name,
581 })
582 .collect::<Vec<_>>();
583
584 {
585 let mut db_settings_files = worktree_settings_file::Entity::find()
586 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
587 .stream(&*tx)
588 .await?;
589 while let Some(db_settings_file) = db_settings_files.next().await {
590 let db_settings_file = db_settings_file?;
591 if let Some(worktree) = worktrees
592 .iter_mut()
593 .find(|w| w.id == db_settings_file.worktree_id as u64)
594 {
595 worktree.settings_files.push(WorktreeSettingsFile {
596 path: db_settings_file.path,
597 content: db_settings_file.content,
598 });
599 }
600 }
601 }
602
603 let mut collaborators = project
604 .find_related(project_collaborator::Entity)
605 .all(&*tx)
606 .await?;
607 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
608 .iter()
609 .position(|collaborator| collaborator.user_id == user_id)
610 {
611 collaborators.swap_remove(self_collaborator_ix)
612 } else {
613 continue;
614 };
615 let old_connection_id = self_collaborator.connection();
616 project_collaborator::Entity::update(project_collaborator::ActiveModel {
617 connection_id: ActiveValue::set(connection.id as i32),
618 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
619 ..self_collaborator.into_active_model()
620 })
621 .exec(&*tx)
622 .await?;
623
624 let collaborators = collaborators
625 .into_iter()
626 .map(|collaborator| ProjectCollaborator {
627 connection_id: collaborator.connection(),
628 user_id: collaborator.user_id,
629 replica_id: collaborator.replica_id,
630 is_host: collaborator.is_host,
631 })
632 .collect::<Vec<_>>();
633
634 rejoined_projects.push(RejoinedProject {
635 id: project_id,
636 old_connection_id,
637 collaborators,
638 worktrees,
639 language_servers,
640 });
641 }
642
643 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
644 let channel_members = if let Some(channel_id) = channel_id {
645 self.get_channel_members_internal(channel_id, &tx).await?
646 } else {
647 Vec::new()
648 };
649
650 Ok(RejoinedRoom {
651 room,
652 channel_id,
653 channel_members,
654 rejoined_projects,
655 reshared_projects,
656 })
657 })
658 .await
659 }
660
661 pub async fn leave_room(
662 &self,
663 connection: ConnectionId,
664 ) -> Result<Option<RoomGuard<LeftRoom>>> {
665 self.optional_room_transaction(|tx| async move {
666 let leaving_participant = room_participant::Entity::find()
667 .filter(
668 Condition::all()
669 .add(
670 room_participant::Column::AnsweringConnectionId
671 .eq(connection.id as i32),
672 )
673 .add(
674 room_participant::Column::AnsweringConnectionServerId
675 .eq(connection.owner_id as i32),
676 ),
677 )
678 .one(&*tx)
679 .await?;
680
681 if let Some(leaving_participant) = leaving_participant {
682 // Leave room.
683 let room_id = leaving_participant.room_id;
684 room_participant::Entity::delete_by_id(leaving_participant.id)
685 .exec(&*tx)
686 .await?;
687
688 // Cancel pending calls initiated by the leaving user.
689 let called_participants = room_participant::Entity::find()
690 .filter(
691 Condition::all()
692 .add(
693 room_participant::Column::CallingUserId
694 .eq(leaving_participant.user_id),
695 )
696 .add(room_participant::Column::AnsweringConnectionId.is_null()),
697 )
698 .all(&*tx)
699 .await?;
700 room_participant::Entity::delete_many()
701 .filter(
702 room_participant::Column::Id
703 .is_in(called_participants.iter().map(|participant| participant.id)),
704 )
705 .exec(&*tx)
706 .await?;
707 let canceled_calls_to_user_ids = called_participants
708 .into_iter()
709 .map(|participant| participant.user_id)
710 .collect();
711
712 // Detect left projects.
713 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
714 enum QueryProjectIds {
715 ProjectId,
716 }
717 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
718 .select_only()
719 .column_as(
720 project_collaborator::Column::ProjectId,
721 QueryProjectIds::ProjectId,
722 )
723 .filter(
724 Condition::all()
725 .add(
726 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
727 )
728 .add(
729 project_collaborator::Column::ConnectionServerId
730 .eq(connection.owner_id as i32),
731 ),
732 )
733 .into_values::<_, QueryProjectIds>()
734 .all(&*tx)
735 .await?;
736 let mut left_projects = HashMap::default();
737 let mut collaborators = project_collaborator::Entity::find()
738 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
739 .stream(&*tx)
740 .await?;
741 while let Some(collaborator) = collaborators.next().await {
742 let collaborator = collaborator?;
743 let left_project =
744 left_projects
745 .entry(collaborator.project_id)
746 .or_insert(LeftProject {
747 id: collaborator.project_id,
748 host_user_id: Default::default(),
749 connection_ids: Default::default(),
750 host_connection_id: Default::default(),
751 });
752
753 let collaborator_connection_id = collaborator.connection();
754 if collaborator_connection_id != connection {
755 left_project.connection_ids.push(collaborator_connection_id);
756 }
757
758 if collaborator.is_host {
759 left_project.host_user_id = collaborator.user_id;
760 left_project.host_connection_id = collaborator_connection_id;
761 }
762 }
763 drop(collaborators);
764
765 // Leave projects.
766 project_collaborator::Entity::delete_many()
767 .filter(
768 Condition::all()
769 .add(
770 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
771 )
772 .add(
773 project_collaborator::Column::ConnectionServerId
774 .eq(connection.owner_id as i32),
775 ),
776 )
777 .exec(&*tx)
778 .await?;
779
780 // Unshare projects.
781 project::Entity::delete_many()
782 .filter(
783 Condition::all()
784 .add(project::Column::RoomId.eq(room_id))
785 .add(project::Column::HostConnectionId.eq(connection.id as i32))
786 .add(
787 project::Column::HostConnectionServerId
788 .eq(connection.owner_id as i32),
789 ),
790 )
791 .exec(&*tx)
792 .await?;
793
794 let (channel_id, room) = self.get_channel_room(room_id, &tx).await?;
795 let deleted = if room.participants.is_empty() {
796 let result = room::Entity::delete_by_id(room_id)
797 .filter(room::Column::ChannelId.is_null())
798 .exec(&*tx)
799 .await?;
800 result.rows_affected > 0
801 } else {
802 false
803 };
804
805 let channel_members = if let Some(channel_id) = channel_id {
806 self.get_channel_members_internal(channel_id, &tx).await?
807 } else {
808 Vec::new()
809 };
810 let left_room = LeftRoom {
811 room,
812 channel_id,
813 channel_members,
814 left_projects,
815 canceled_calls_to_user_ids,
816 deleted,
817 };
818
819 if left_room.room.participants.is_empty() {
820 self.rooms.remove(&room_id);
821 }
822
823 Ok(Some((room_id, left_room)))
824 } else {
825 Ok(None)
826 }
827 })
828 .await
829 }
830
831 pub async fn update_room_participant_location(
832 &self,
833 room_id: RoomId,
834 connection: ConnectionId,
835 location: proto::ParticipantLocation,
836 ) -> Result<RoomGuard<proto::Room>> {
837 self.room_transaction(room_id, |tx| async {
838 let tx = tx;
839 let location_kind;
840 let location_project_id;
841 match location
842 .variant
843 .as_ref()
844 .ok_or_else(|| anyhow!("invalid location"))?
845 {
846 proto::participant_location::Variant::SharedProject(project) => {
847 location_kind = 0;
848 location_project_id = Some(ProjectId::from_proto(project.id));
849 }
850 proto::participant_location::Variant::UnsharedProject(_) => {
851 location_kind = 1;
852 location_project_id = None;
853 }
854 proto::participant_location::Variant::External(_) => {
855 location_kind = 2;
856 location_project_id = None;
857 }
858 }
859
860 let result = room_participant::Entity::update_many()
861 .filter(
862 Condition::all()
863 .add(room_participant::Column::RoomId.eq(room_id))
864 .add(
865 room_participant::Column::AnsweringConnectionId
866 .eq(connection.id as i32),
867 )
868 .add(
869 room_participant::Column::AnsweringConnectionServerId
870 .eq(connection.owner_id as i32),
871 ),
872 )
873 .set(room_participant::ActiveModel {
874 location_kind: ActiveValue::set(Some(location_kind)),
875 location_project_id: ActiveValue::set(location_project_id),
876 ..Default::default()
877 })
878 .exec(&*tx)
879 .await?;
880
881 if result.rows_affected == 1 {
882 let room = self.get_room(room_id, &tx).await?;
883 Ok(room)
884 } else {
885 Err(anyhow!("could not update room participant location"))?
886 }
887 })
888 .await
889 }
890
891 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
892 self.transaction(|tx| async move {
893 self.room_connection_lost(connection, &*tx).await?;
894 self.channel_buffer_connection_lost(connection, &*tx)
895 .await?;
896 self.channel_chat_connection_lost(connection, &*tx).await?;
897 Ok(())
898 })
899 .await
900 }
901
902 pub async fn room_connection_lost(
903 &self,
904 connection: ConnectionId,
905 tx: &DatabaseTransaction,
906 ) -> Result<()> {
907 let participant = room_participant::Entity::find()
908 .filter(
909 Condition::all()
910 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
911 .add(
912 room_participant::Column::AnsweringConnectionServerId
913 .eq(connection.owner_id as i32),
914 ),
915 )
916 .one(&*tx)
917 .await?;
918
919 if let Some(participant) = participant {
920 room_participant::Entity::update(room_participant::ActiveModel {
921 answering_connection_lost: ActiveValue::set(true),
922 ..participant.into_active_model()
923 })
924 .exec(&*tx)
925 .await?;
926 }
927 Ok(())
928 }
929
930 fn build_incoming_call(
931 room: &proto::Room,
932 called_user_id: UserId,
933 ) -> Option<proto::IncomingCall> {
934 let pending_participant = room
935 .pending_participants
936 .iter()
937 .find(|participant| participant.user_id == called_user_id.to_proto())?;
938
939 Some(proto::IncomingCall {
940 room_id: room.id,
941 calling_user_id: pending_participant.calling_user_id,
942 participant_user_ids: room
943 .participants
944 .iter()
945 .map(|participant| participant.user_id)
946 .collect(),
947 initial_project: room.participants.iter().find_map(|participant| {
948 let initial_project_id = pending_participant.initial_project_id?;
949 participant
950 .projects
951 .iter()
952 .find(|project| project.id == initial_project_id)
953 .cloned()
954 }),
955 })
956 }
957
958 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
959 let (_, room) = self.get_channel_room(room_id, tx).await?;
960 Ok(room)
961 }
962
963 async fn get_channel_room(
964 &self,
965 room_id: RoomId,
966 tx: &DatabaseTransaction,
967 ) -> Result<(Option<ChannelId>, proto::Room)> {
968 let db_room = room::Entity::find_by_id(room_id)
969 .one(tx)
970 .await?
971 .ok_or_else(|| anyhow!("could not find room"))?;
972
973 let mut db_participants = db_room
974 .find_related(room_participant::Entity)
975 .stream(tx)
976 .await?;
977 let mut participants = HashMap::default();
978 let mut pending_participants = Vec::new();
979 while let Some(db_participant) = db_participants.next().await {
980 let db_participant = db_participant?;
981 if let Some((answering_connection_id, answering_connection_server_id)) = db_participant
982 .answering_connection_id
983 .zip(db_participant.answering_connection_server_id)
984 {
985 let location = match (
986 db_participant.location_kind,
987 db_participant.location_project_id,
988 ) {
989 (Some(0), Some(project_id)) => {
990 Some(proto::participant_location::Variant::SharedProject(
991 proto::participant_location::SharedProject {
992 id: project_id.to_proto(),
993 },
994 ))
995 }
996 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
997 Default::default(),
998 )),
999 _ => Some(proto::participant_location::Variant::External(
1000 Default::default(),
1001 )),
1002 };
1003
1004 let answering_connection = ConnectionId {
1005 owner_id: answering_connection_server_id.0 as u32,
1006 id: answering_connection_id as u32,
1007 };
1008 participants.insert(
1009 answering_connection,
1010 proto::Participant {
1011 user_id: db_participant.user_id.to_proto(),
1012 peer_id: Some(answering_connection.into()),
1013 projects: Default::default(),
1014 location: Some(proto::ParticipantLocation { variant: location }),
1015 },
1016 );
1017 } else {
1018 pending_participants.push(proto::PendingParticipant {
1019 user_id: db_participant.user_id.to_proto(),
1020 calling_user_id: db_participant.calling_user_id.to_proto(),
1021 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1022 });
1023 }
1024 }
1025 drop(db_participants);
1026
1027 let mut db_projects = db_room
1028 .find_related(project::Entity)
1029 .find_with_related(worktree::Entity)
1030 .stream(tx)
1031 .await?;
1032
1033 while let Some(row) = db_projects.next().await {
1034 let (db_project, db_worktree) = row?;
1035 let host_connection = db_project.host_connection()?;
1036 if let Some(participant) = participants.get_mut(&host_connection) {
1037 let project = if let Some(project) = participant
1038 .projects
1039 .iter_mut()
1040 .find(|project| project.id == db_project.id.to_proto())
1041 {
1042 project
1043 } else {
1044 participant.projects.push(proto::ParticipantProject {
1045 id: db_project.id.to_proto(),
1046 worktree_root_names: Default::default(),
1047 });
1048 participant.projects.last_mut().unwrap()
1049 };
1050
1051 if let Some(db_worktree) = db_worktree {
1052 if db_worktree.visible {
1053 project.worktree_root_names.push(db_worktree.root_name);
1054 }
1055 }
1056 }
1057 }
1058 drop(db_projects);
1059
1060 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1061 let mut followers = Vec::new();
1062 while let Some(db_follower) = db_followers.next().await {
1063 let db_follower = db_follower?;
1064 followers.push(proto::Follower {
1065 leader_id: Some(db_follower.leader_connection().into()),
1066 follower_id: Some(db_follower.follower_connection().into()),
1067 project_id: db_follower.project_id.to_proto(),
1068 });
1069 }
1070
1071 Ok((
1072 db_room.channel_id,
1073 proto::Room {
1074 id: db_room.id.to_proto(),
1075 live_kit_room: db_room.live_kit_room,
1076 participants: participants.into_values().collect(),
1077 pending_participants,
1078 followers,
1079 },
1080 ))
1081 }
1082}