1use super::*;
2
3impl Database {
4 pub async fn clear_stale_room_participants(
5 &self,
6 room_id: RoomId,
7 new_server_id: ServerId,
8 ) -> Result<RoomGuard<RefreshedRoom>> {
9 self.room_transaction(room_id, |tx| async move {
10 let stale_participant_filter = Condition::all()
11 .add(room_participant::Column::RoomId.eq(room_id))
12 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
13 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
14
15 let stale_participant_user_ids = room_participant::Entity::find()
16 .filter(stale_participant_filter.clone())
17 .all(&*tx)
18 .await?
19 .into_iter()
20 .map(|participant| participant.user_id)
21 .collect::<Vec<_>>();
22
23 // Delete participants who failed to reconnect and cancel their calls.
24 let mut canceled_calls_to_user_ids = Vec::new();
25 room_participant::Entity::delete_many()
26 .filter(stale_participant_filter)
27 .exec(&*tx)
28 .await?;
29 let called_participants = room_participant::Entity::find()
30 .filter(
31 Condition::all()
32 .add(
33 room_participant::Column::CallingUserId
34 .is_in(stale_participant_user_ids.iter().copied()),
35 )
36 .add(room_participant::Column::AnsweringConnectionId.is_null()),
37 )
38 .all(&*tx)
39 .await?;
40 room_participant::Entity::delete_many()
41 .filter(
42 room_participant::Column::Id
43 .is_in(called_participants.iter().map(|participant| participant.id)),
44 )
45 .exec(&*tx)
46 .await?;
47 canceled_calls_to_user_ids.extend(
48 called_participants
49 .into_iter()
50 .map(|participant| participant.user_id),
51 );
52
53 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
54 let channel_members;
55 if let Some(channel) = &channel {
56 channel_members = self.get_channel_participants(channel, &tx).await?;
57 } else {
58 channel_members = Vec::new();
59
60 // Delete the room if it becomes empty.
61 if room.participants.is_empty() {
62 project::Entity::delete_many()
63 .filter(project::Column::RoomId.eq(room_id))
64 .exec(&*tx)
65 .await?;
66 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
67 }
68 };
69
70 Ok(RefreshedRoom {
71 room,
72 channel_id: channel.map(|channel| channel.id),
73 channel_members,
74 stale_participant_user_ids,
75 canceled_calls_to_user_ids,
76 })
77 })
78 .await
79 }
80
81 pub async fn incoming_call_for_user(
82 &self,
83 user_id: UserId,
84 ) -> Result<Option<proto::IncomingCall>> {
85 self.transaction(|tx| async move {
86 let pending_participant = room_participant::Entity::find()
87 .filter(
88 room_participant::Column::UserId
89 .eq(user_id)
90 .and(room_participant::Column::AnsweringConnectionId.is_null()),
91 )
92 .one(&*tx)
93 .await?;
94
95 if let Some(pending_participant) = pending_participant {
96 let room = self.get_room(pending_participant.room_id, &tx).await?;
97 Ok(Self::build_incoming_call(&room, user_id))
98 } else {
99 Ok(None)
100 }
101 })
102 .await
103 }
104
105 pub async fn create_room(
106 &self,
107 user_id: UserId,
108 connection: ConnectionId,
109 live_kit_room: &str,
110 release_channel: &str,
111 ) -> Result<proto::Room> {
112 self.transaction(|tx| async move {
113 let room = room::ActiveModel {
114 live_kit_room: ActiveValue::set(live_kit_room.into()),
115 enviroment: ActiveValue::set(Some(release_channel.to_string())),
116 ..Default::default()
117 }
118 .insert(&*tx)
119 .await?;
120 room_participant::ActiveModel {
121 room_id: ActiveValue::set(room.id),
122 user_id: ActiveValue::set(user_id),
123 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
124 answering_connection_server_id: ActiveValue::set(Some(ServerId(
125 connection.owner_id as i32,
126 ))),
127 answering_connection_lost: ActiveValue::set(false),
128 calling_user_id: ActiveValue::set(user_id),
129 calling_connection_id: ActiveValue::set(connection.id as i32),
130 calling_connection_server_id: ActiveValue::set(Some(ServerId(
131 connection.owner_id as i32,
132 ))),
133 participant_index: ActiveValue::set(Some(0)),
134 role: ActiveValue::set(Some(ChannelRole::Admin)),
135
136 id: ActiveValue::NotSet,
137 location_kind: ActiveValue::NotSet,
138 location_project_id: ActiveValue::NotSet,
139 initial_project_id: ActiveValue::NotSet,
140 }
141 .insert(&*tx)
142 .await?;
143
144 let room = self.get_room(room.id, &tx).await?;
145 Ok(room)
146 })
147 .await
148 }
149
150 pub async fn call(
151 &self,
152 room_id: RoomId,
153 calling_user_id: UserId,
154 calling_connection: ConnectionId,
155 called_user_id: UserId,
156 initial_project_id: Option<ProjectId>,
157 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
158 self.room_transaction(room_id, |tx| async move {
159 let caller = room_participant::Entity::find()
160 .filter(
161 room_participant::Column::UserId
162 .eq(calling_user_id)
163 .and(room_participant::Column::RoomId.eq(room_id)),
164 )
165 .one(&*tx)
166 .await?
167 .ok_or_else(|| anyhow!("user is not in the room"))?;
168
169 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
170 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
171 ChannelRole::Guest => ChannelRole::Guest,
172 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
173 };
174
175 room_participant::ActiveModel {
176 room_id: ActiveValue::set(room_id),
177 user_id: ActiveValue::set(called_user_id),
178 answering_connection_lost: ActiveValue::set(false),
179 participant_index: ActiveValue::NotSet,
180 calling_user_id: ActiveValue::set(calling_user_id),
181 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
182 calling_connection_server_id: ActiveValue::set(Some(ServerId(
183 calling_connection.owner_id as i32,
184 ))),
185 initial_project_id: ActiveValue::set(initial_project_id),
186 role: ActiveValue::set(Some(called_user_role)),
187
188 id: ActiveValue::NotSet,
189 answering_connection_id: ActiveValue::NotSet,
190 answering_connection_server_id: ActiveValue::NotSet,
191 location_kind: ActiveValue::NotSet,
192 location_project_id: ActiveValue::NotSet,
193 }
194 .insert(&*tx)
195 .await?;
196
197 let room = self.get_room(room_id, &tx).await?;
198 let incoming_call = Self::build_incoming_call(&room, called_user_id)
199 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
200 Ok((room, incoming_call))
201 })
202 .await
203 }
204
205 pub async fn call_failed(
206 &self,
207 room_id: RoomId,
208 called_user_id: UserId,
209 ) -> Result<RoomGuard<proto::Room>> {
210 self.room_transaction(room_id, |tx| async move {
211 room_participant::Entity::delete_many()
212 .filter(
213 room_participant::Column::RoomId
214 .eq(room_id)
215 .and(room_participant::Column::UserId.eq(called_user_id)),
216 )
217 .exec(&*tx)
218 .await?;
219 let room = self.get_room(room_id, &tx).await?;
220 Ok(room)
221 })
222 .await
223 }
224
225 pub async fn decline_call(
226 &self,
227 expected_room_id: Option<RoomId>,
228 user_id: UserId,
229 ) -> Result<Option<RoomGuard<proto::Room>>> {
230 self.optional_room_transaction(|tx| async move {
231 let mut filter = Condition::all()
232 .add(room_participant::Column::UserId.eq(user_id))
233 .add(room_participant::Column::AnsweringConnectionId.is_null());
234 if let Some(room_id) = expected_room_id {
235 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
236 }
237 let participant = room_participant::Entity::find()
238 .filter(filter)
239 .one(&*tx)
240 .await?;
241
242 let participant = if let Some(participant) = participant {
243 participant
244 } else if expected_room_id.is_some() {
245 return Err(anyhow!("could not find call to decline"))?;
246 } else {
247 return Ok(None);
248 };
249
250 let room_id = participant.room_id;
251 room_participant::Entity::delete(participant.into_active_model())
252 .exec(&*tx)
253 .await?;
254
255 let room = self.get_room(room_id, &tx).await?;
256 Ok(Some((room_id, room)))
257 })
258 .await
259 }
260
261 pub async fn cancel_call(
262 &self,
263 room_id: RoomId,
264 calling_connection: ConnectionId,
265 called_user_id: UserId,
266 ) -> Result<RoomGuard<proto::Room>> {
267 self.room_transaction(room_id, |tx| async move {
268 let participant = room_participant::Entity::find()
269 .filter(
270 Condition::all()
271 .add(room_participant::Column::UserId.eq(called_user_id))
272 .add(room_participant::Column::RoomId.eq(room_id))
273 .add(
274 room_participant::Column::CallingConnectionId
275 .eq(calling_connection.id as i32),
276 )
277 .add(
278 room_participant::Column::CallingConnectionServerId
279 .eq(calling_connection.owner_id as i32),
280 )
281 .add(room_participant::Column::AnsweringConnectionId.is_null()),
282 )
283 .one(&*tx)
284 .await?
285 .ok_or_else(|| anyhow!("no call to cancel"))?;
286
287 room_participant::Entity::delete(participant.into_active_model())
288 .exec(&*tx)
289 .await?;
290
291 let room = self.get_room(room_id, &tx).await?;
292 Ok(room)
293 })
294 .await
295 }
296
297 pub async fn join_room(
298 &self,
299 room_id: RoomId,
300 user_id: UserId,
301 connection: ConnectionId,
302 enviroment: &str,
303 ) -> Result<RoomGuard<JoinRoom>> {
304 self.room_transaction(room_id, |tx| async move {
305 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
306 enum QueryChannelIdAndEnviroment {
307 ChannelId,
308 Enviroment,
309 }
310
311 let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
312 room::Entity::find()
313 .select_only()
314 .column(room::Column::ChannelId)
315 .column(room::Column::Enviroment)
316 .filter(room::Column::Id.eq(room_id))
317 .into_values::<_, QueryChannelIdAndEnviroment>()
318 .one(&*tx)
319 .await?
320 .ok_or_else(|| anyhow!("no such room"))?;
321
322 if let Some(release_channel) = release_channel {
323 if &release_channel != enviroment {
324 Err(anyhow!("must join using the {} release", release_channel))?;
325 }
326 }
327
328 if channel_id.is_some() {
329 Err(anyhow!("tried to join channel call directly"))?
330 }
331
332 let participant_index = self
333 .get_next_participant_index_internal(room_id, &*tx)
334 .await?;
335
336 let result = room_participant::Entity::update_many()
337 .filter(
338 Condition::all()
339 .add(room_participant::Column::RoomId.eq(room_id))
340 .add(room_participant::Column::UserId.eq(user_id))
341 .add(room_participant::Column::AnsweringConnectionId.is_null()),
342 )
343 .set(room_participant::ActiveModel {
344 participant_index: ActiveValue::Set(Some(participant_index)),
345 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
346 answering_connection_server_id: ActiveValue::set(Some(ServerId(
347 connection.owner_id as i32,
348 ))),
349 answering_connection_lost: ActiveValue::set(false),
350 ..Default::default()
351 })
352 .exec(&*tx)
353 .await?;
354 if result.rows_affected == 0 {
355 Err(anyhow!("room does not exist or was already joined"))?;
356 }
357
358 let room = self.get_room(room_id, &tx).await?;
359 Ok(JoinRoom {
360 room,
361 channel_id: None,
362 channel_members: vec![],
363 })
364 })
365 .await
366 }
367
368 async fn get_next_participant_index_internal(
369 &self,
370 room_id: RoomId,
371 tx: &DatabaseTransaction,
372 ) -> Result<i32> {
373 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
374 enum QueryParticipantIndices {
375 ParticipantIndex,
376 }
377 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
378 .filter(
379 room_participant::Column::RoomId
380 .eq(room_id)
381 .and(room_participant::Column::ParticipantIndex.is_not_null()),
382 )
383 .select_only()
384 .column(room_participant::Column::ParticipantIndex)
385 .into_values::<_, QueryParticipantIndices>()
386 .all(&*tx)
387 .await?;
388
389 let mut participant_index = 0;
390 while existing_participant_indices.contains(&participant_index) {
391 participant_index += 1;
392 }
393
394 Ok(participant_index)
395 }
396
397 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
398 self.transaction(|tx| async move {
399 let room: Option<room::Model> = room::Entity::find()
400 .filter(room::Column::Id.eq(room_id))
401 .one(&*tx)
402 .await?;
403
404 Ok(room.and_then(|room| room.channel_id))
405 })
406 .await
407 }
408
409 pub(crate) async fn join_channel_room_internal(
410 &self,
411 room_id: RoomId,
412 user_id: UserId,
413 connection: ConnectionId,
414 role: ChannelRole,
415 tx: &DatabaseTransaction,
416 ) -> Result<JoinRoom> {
417 let participant_index = self
418 .get_next_participant_index_internal(room_id, &*tx)
419 .await?;
420
421 room_participant::Entity::insert_many([room_participant::ActiveModel {
422 room_id: ActiveValue::set(room_id),
423 user_id: ActiveValue::set(user_id),
424 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
425 answering_connection_server_id: ActiveValue::set(Some(ServerId(
426 connection.owner_id as i32,
427 ))),
428 answering_connection_lost: ActiveValue::set(false),
429 calling_user_id: ActiveValue::set(user_id),
430 calling_connection_id: ActiveValue::set(connection.id as i32),
431 calling_connection_server_id: ActiveValue::set(Some(ServerId(
432 connection.owner_id as i32,
433 ))),
434 participant_index: ActiveValue::Set(Some(participant_index)),
435 role: ActiveValue::set(Some(role)),
436 id: ActiveValue::NotSet,
437 location_kind: ActiveValue::NotSet,
438 location_project_id: ActiveValue::NotSet,
439 initial_project_id: ActiveValue::NotSet,
440 }])
441 .on_conflict(
442 OnConflict::columns([room_participant::Column::UserId])
443 .update_columns([
444 room_participant::Column::AnsweringConnectionId,
445 room_participant::Column::AnsweringConnectionServerId,
446 room_participant::Column::AnsweringConnectionLost,
447 room_participant::Column::ParticipantIndex,
448 room_participant::Column::Role,
449 ])
450 .to_owned(),
451 )
452 .exec(&*tx)
453 .await?;
454
455 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
456 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
457 let channel_members = self.get_channel_participants(&channel, &*tx).await?;
458 Ok(JoinRoom {
459 room,
460 channel_id: Some(channel.id),
461 channel_members,
462 })
463 }
464
465 pub async fn rejoin_room(
466 &self,
467 rejoin_room: proto::RejoinRoom,
468 user_id: UserId,
469 connection: ConnectionId,
470 ) -> Result<RoomGuard<RejoinedRoom>> {
471 let room_id = RoomId::from_proto(rejoin_room.id);
472 self.room_transaction(room_id, |tx| async {
473 let tx = tx;
474 let participant_update = room_participant::Entity::update_many()
475 .filter(
476 Condition::all()
477 .add(room_participant::Column::RoomId.eq(room_id))
478 .add(room_participant::Column::UserId.eq(user_id))
479 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
480 .add(
481 Condition::any()
482 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
483 .add(
484 room_participant::Column::AnsweringConnectionServerId
485 .ne(connection.owner_id as i32),
486 ),
487 ),
488 )
489 .set(room_participant::ActiveModel {
490 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
491 answering_connection_server_id: ActiveValue::set(Some(ServerId(
492 connection.owner_id as i32,
493 ))),
494 answering_connection_lost: ActiveValue::set(false),
495 ..Default::default()
496 })
497 .exec(&*tx)
498 .await?;
499 if participant_update.rows_affected == 0 {
500 return Err(anyhow!("room does not exist or was already joined"))?;
501 }
502
503 let mut reshared_projects = Vec::new();
504 for reshared_project in &rejoin_room.reshared_projects {
505 let project_id = ProjectId::from_proto(reshared_project.project_id);
506 let project = project::Entity::find_by_id(project_id)
507 .one(&*tx)
508 .await?
509 .ok_or_else(|| anyhow!("project does not exist"))?;
510 if project.host_user_id != user_id {
511 return Err(anyhow!("no such project"))?;
512 }
513
514 let mut collaborators = project
515 .find_related(project_collaborator::Entity)
516 .all(&*tx)
517 .await?;
518 let host_ix = collaborators
519 .iter()
520 .position(|collaborator| {
521 collaborator.user_id == user_id && collaborator.is_host
522 })
523 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
524 let host = collaborators.swap_remove(host_ix);
525 let old_connection_id = host.connection();
526
527 project::Entity::update(project::ActiveModel {
528 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
529 host_connection_server_id: ActiveValue::set(Some(ServerId(
530 connection.owner_id as i32,
531 ))),
532 ..project.into_active_model()
533 })
534 .exec(&*tx)
535 .await?;
536 project_collaborator::Entity::update(project_collaborator::ActiveModel {
537 connection_id: ActiveValue::set(connection.id as i32),
538 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
539 ..host.into_active_model()
540 })
541 .exec(&*tx)
542 .await?;
543
544 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
545 .await?;
546
547 reshared_projects.push(ResharedProject {
548 id: project_id,
549 old_connection_id,
550 collaborators: collaborators
551 .iter()
552 .map(|collaborator| ProjectCollaborator {
553 connection_id: collaborator.connection(),
554 user_id: collaborator.user_id,
555 replica_id: collaborator.replica_id,
556 is_host: collaborator.is_host,
557 })
558 .collect(),
559 worktrees: reshared_project.worktrees.clone(),
560 });
561 }
562
563 project::Entity::delete_many()
564 .filter(
565 Condition::all()
566 .add(project::Column::RoomId.eq(room_id))
567 .add(project::Column::HostUserId.eq(user_id))
568 .add(
569 project::Column::Id
570 .is_not_in(reshared_projects.iter().map(|project| project.id)),
571 ),
572 )
573 .exec(&*tx)
574 .await?;
575
576 let mut rejoined_projects = Vec::new();
577 for rejoined_project in &rejoin_room.rejoined_projects {
578 let project_id = ProjectId::from_proto(rejoined_project.id);
579 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
580 continue;
581 };
582
583 let mut worktrees = Vec::new();
584 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
585 for db_worktree in db_worktrees {
586 let mut worktree = RejoinedWorktree {
587 id: db_worktree.id as u64,
588 abs_path: db_worktree.abs_path,
589 root_name: db_worktree.root_name,
590 visible: db_worktree.visible,
591 updated_entries: Default::default(),
592 removed_entries: Default::default(),
593 updated_repositories: Default::default(),
594 removed_repositories: Default::default(),
595 diagnostic_summaries: Default::default(),
596 settings_files: Default::default(),
597 scan_id: db_worktree.scan_id as u64,
598 completed_scan_id: db_worktree.completed_scan_id as u64,
599 };
600
601 let rejoined_worktree = rejoined_project
602 .worktrees
603 .iter()
604 .find(|worktree| worktree.id == db_worktree.id as u64);
605
606 // File entries
607 {
608 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
609 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
610 } else {
611 worktree_entry::Column::IsDeleted.eq(false)
612 };
613
614 let mut db_entries = worktree_entry::Entity::find()
615 .filter(
616 Condition::all()
617 .add(worktree_entry::Column::ProjectId.eq(project.id))
618 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
619 .add(entry_filter),
620 )
621 .stream(&*tx)
622 .await?;
623
624 while let Some(db_entry) = db_entries.next().await {
625 let db_entry = db_entry?;
626 if db_entry.is_deleted {
627 worktree.removed_entries.push(db_entry.id as u64);
628 } else {
629 worktree.updated_entries.push(proto::Entry {
630 id: db_entry.id as u64,
631 is_dir: db_entry.is_dir,
632 path: db_entry.path,
633 inode: db_entry.inode as u64,
634 mtime: Some(proto::Timestamp {
635 seconds: db_entry.mtime_seconds as u64,
636 nanos: db_entry.mtime_nanos as u32,
637 }),
638 is_symlink: db_entry.is_symlink,
639 is_ignored: db_entry.is_ignored,
640 is_external: db_entry.is_external,
641 git_status: db_entry.git_status.map(|status| status as i32),
642 });
643 }
644 }
645 }
646
647 // Repository Entries
648 {
649 let repository_entry_filter =
650 if let Some(rejoined_worktree) = rejoined_worktree {
651 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
652 } else {
653 worktree_repository::Column::IsDeleted.eq(false)
654 };
655
656 let mut db_repositories = worktree_repository::Entity::find()
657 .filter(
658 Condition::all()
659 .add(worktree_repository::Column::ProjectId.eq(project.id))
660 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
661 .add(repository_entry_filter),
662 )
663 .stream(&*tx)
664 .await?;
665
666 while let Some(db_repository) = db_repositories.next().await {
667 let db_repository = db_repository?;
668 if db_repository.is_deleted {
669 worktree
670 .removed_repositories
671 .push(db_repository.work_directory_id as u64);
672 } else {
673 worktree.updated_repositories.push(proto::RepositoryEntry {
674 work_directory_id: db_repository.work_directory_id as u64,
675 branch: db_repository.branch,
676 });
677 }
678 }
679 }
680
681 worktrees.push(worktree);
682 }
683
684 let language_servers = project
685 .find_related(language_server::Entity)
686 .all(&*tx)
687 .await?
688 .into_iter()
689 .map(|language_server| proto::LanguageServer {
690 id: language_server.id as u64,
691 name: language_server.name,
692 })
693 .collect::<Vec<_>>();
694
695 {
696 let mut db_settings_files = worktree_settings_file::Entity::find()
697 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
698 .stream(&*tx)
699 .await?;
700 while let Some(db_settings_file) = db_settings_files.next().await {
701 let db_settings_file = db_settings_file?;
702 if let Some(worktree) = worktrees
703 .iter_mut()
704 .find(|w| w.id == db_settings_file.worktree_id as u64)
705 {
706 worktree.settings_files.push(WorktreeSettingsFile {
707 path: db_settings_file.path,
708 content: db_settings_file.content,
709 });
710 }
711 }
712 }
713
714 let mut collaborators = project
715 .find_related(project_collaborator::Entity)
716 .all(&*tx)
717 .await?;
718 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
719 .iter()
720 .position(|collaborator| collaborator.user_id == user_id)
721 {
722 collaborators.swap_remove(self_collaborator_ix)
723 } else {
724 continue;
725 };
726 let old_connection_id = self_collaborator.connection();
727 project_collaborator::Entity::update(project_collaborator::ActiveModel {
728 connection_id: ActiveValue::set(connection.id as i32),
729 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
730 ..self_collaborator.into_active_model()
731 })
732 .exec(&*tx)
733 .await?;
734
735 let collaborators = collaborators
736 .into_iter()
737 .map(|collaborator| ProjectCollaborator {
738 connection_id: collaborator.connection(),
739 user_id: collaborator.user_id,
740 replica_id: collaborator.replica_id,
741 is_host: collaborator.is_host,
742 })
743 .collect::<Vec<_>>();
744
745 rejoined_projects.push(RejoinedProject {
746 id: project_id,
747 old_connection_id,
748 collaborators,
749 worktrees,
750 language_servers,
751 });
752 }
753
754 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
755 let channel_members = if let Some(channel) = &channel {
756 self.get_channel_participants(&channel, &tx).await?
757 } else {
758 Vec::new()
759 };
760
761 Ok(RejoinedRoom {
762 room,
763 channel_id: channel.map(|channel| channel.id),
764 channel_members,
765 rejoined_projects,
766 reshared_projects,
767 })
768 })
769 .await
770 }
771
772 pub async fn leave_room(
773 &self,
774 connection: ConnectionId,
775 ) -> Result<Option<RoomGuard<LeftRoom>>> {
776 self.optional_room_transaction(|tx| async move {
777 let leaving_participant = room_participant::Entity::find()
778 .filter(
779 Condition::all()
780 .add(
781 room_participant::Column::AnsweringConnectionId
782 .eq(connection.id as i32),
783 )
784 .add(
785 room_participant::Column::AnsweringConnectionServerId
786 .eq(connection.owner_id as i32),
787 ),
788 )
789 .one(&*tx)
790 .await?;
791
792 if let Some(leaving_participant) = leaving_participant {
793 // Leave room.
794 let room_id = leaving_participant.room_id;
795 room_participant::Entity::delete_by_id(leaving_participant.id)
796 .exec(&*tx)
797 .await?;
798
799 // Cancel pending calls initiated by the leaving user.
800 let called_participants = room_participant::Entity::find()
801 .filter(
802 Condition::all()
803 .add(
804 room_participant::Column::CallingUserId
805 .eq(leaving_participant.user_id),
806 )
807 .add(room_participant::Column::AnsweringConnectionId.is_null()),
808 )
809 .all(&*tx)
810 .await?;
811 room_participant::Entity::delete_many()
812 .filter(
813 room_participant::Column::Id
814 .is_in(called_participants.iter().map(|participant| participant.id)),
815 )
816 .exec(&*tx)
817 .await?;
818 let canceled_calls_to_user_ids = called_participants
819 .into_iter()
820 .map(|participant| participant.user_id)
821 .collect();
822
823 // Detect left projects.
824 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
825 enum QueryProjectIds {
826 ProjectId,
827 }
828 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
829 .select_only()
830 .column_as(
831 project_collaborator::Column::ProjectId,
832 QueryProjectIds::ProjectId,
833 )
834 .filter(
835 Condition::all()
836 .add(
837 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
838 )
839 .add(
840 project_collaborator::Column::ConnectionServerId
841 .eq(connection.owner_id as i32),
842 ),
843 )
844 .into_values::<_, QueryProjectIds>()
845 .all(&*tx)
846 .await?;
847 let mut left_projects = HashMap::default();
848 let mut collaborators = project_collaborator::Entity::find()
849 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
850 .stream(&*tx)
851 .await?;
852 while let Some(collaborator) = collaborators.next().await {
853 let collaborator = collaborator?;
854 let left_project =
855 left_projects
856 .entry(collaborator.project_id)
857 .or_insert(LeftProject {
858 id: collaborator.project_id,
859 host_user_id: Default::default(),
860 connection_ids: Default::default(),
861 host_connection_id: Default::default(),
862 });
863
864 let collaborator_connection_id = collaborator.connection();
865 if collaborator_connection_id != connection {
866 left_project.connection_ids.push(collaborator_connection_id);
867 }
868
869 if collaborator.is_host {
870 left_project.host_user_id = collaborator.user_id;
871 left_project.host_connection_id = collaborator_connection_id;
872 }
873 }
874 drop(collaborators);
875
876 // Leave projects.
877 project_collaborator::Entity::delete_many()
878 .filter(
879 Condition::all()
880 .add(
881 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
882 )
883 .add(
884 project_collaborator::Column::ConnectionServerId
885 .eq(connection.owner_id as i32),
886 ),
887 )
888 .exec(&*tx)
889 .await?;
890
891 // Unshare projects.
892 project::Entity::delete_many()
893 .filter(
894 Condition::all()
895 .add(project::Column::RoomId.eq(room_id))
896 .add(project::Column::HostConnectionId.eq(connection.id as i32))
897 .add(
898 project::Column::HostConnectionServerId
899 .eq(connection.owner_id as i32),
900 ),
901 )
902 .exec(&*tx)
903 .await?;
904
905 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
906 let deleted = if room.participants.is_empty() {
907 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
908 result.rows_affected > 0
909 } else {
910 false
911 };
912
913 let channel_members = if let Some(channel) = &channel {
914 self.get_channel_participants(channel, &tx).await?
915 } else {
916 Vec::new()
917 };
918 let left_room = LeftRoom {
919 room,
920 channel_id: channel.map(|channel| channel.id),
921 channel_members,
922 left_projects,
923 canceled_calls_to_user_ids,
924 deleted,
925 };
926
927 if left_room.room.participants.is_empty() {
928 self.rooms.remove(&room_id);
929 }
930
931 Ok(Some((room_id, left_room)))
932 } else {
933 Ok(None)
934 }
935 })
936 .await
937 }
938
939 pub async fn update_room_participant_location(
940 &self,
941 room_id: RoomId,
942 connection: ConnectionId,
943 location: proto::ParticipantLocation,
944 ) -> Result<RoomGuard<proto::Room>> {
945 self.room_transaction(room_id, |tx| async {
946 let tx = tx;
947 let location_kind;
948 let location_project_id;
949 match location
950 .variant
951 .as_ref()
952 .ok_or_else(|| anyhow!("invalid location"))?
953 {
954 proto::participant_location::Variant::SharedProject(project) => {
955 location_kind = 0;
956 location_project_id = Some(ProjectId::from_proto(project.id));
957 }
958 proto::participant_location::Variant::UnsharedProject(_) => {
959 location_kind = 1;
960 location_project_id = None;
961 }
962 proto::participant_location::Variant::External(_) => {
963 location_kind = 2;
964 location_project_id = None;
965 }
966 }
967
968 let result = room_participant::Entity::update_many()
969 .filter(
970 Condition::all()
971 .add(room_participant::Column::RoomId.eq(room_id))
972 .add(
973 room_participant::Column::AnsweringConnectionId
974 .eq(connection.id as i32),
975 )
976 .add(
977 room_participant::Column::AnsweringConnectionServerId
978 .eq(connection.owner_id as i32),
979 ),
980 )
981 .set(room_participant::ActiveModel {
982 location_kind: ActiveValue::set(Some(location_kind)),
983 location_project_id: ActiveValue::set(location_project_id),
984 ..Default::default()
985 })
986 .exec(&*tx)
987 .await?;
988
989 if result.rows_affected == 1 {
990 let room = self.get_room(room_id, &tx).await?;
991 Ok(room)
992 } else {
993 Err(anyhow!("could not update room participant location"))?
994 }
995 })
996 .await
997 }
998
999 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1000 self.transaction(|tx| async move {
1001 self.room_connection_lost(connection, &*tx).await?;
1002 self.channel_buffer_connection_lost(connection, &*tx)
1003 .await?;
1004 self.channel_chat_connection_lost(connection, &*tx).await?;
1005 Ok(())
1006 })
1007 .await
1008 }
1009
1010 pub async fn room_connection_lost(
1011 &self,
1012 connection: ConnectionId,
1013 tx: &DatabaseTransaction,
1014 ) -> Result<()> {
1015 let participant = room_participant::Entity::find()
1016 .filter(
1017 Condition::all()
1018 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1019 .add(
1020 room_participant::Column::AnsweringConnectionServerId
1021 .eq(connection.owner_id as i32),
1022 ),
1023 )
1024 .one(&*tx)
1025 .await?;
1026
1027 if let Some(participant) = participant {
1028 room_participant::Entity::update(room_participant::ActiveModel {
1029 answering_connection_lost: ActiveValue::set(true),
1030 ..participant.into_active_model()
1031 })
1032 .exec(&*tx)
1033 .await?;
1034 }
1035 Ok(())
1036 }
1037
1038 fn build_incoming_call(
1039 room: &proto::Room,
1040 called_user_id: UserId,
1041 ) -> Option<proto::IncomingCall> {
1042 let pending_participant = room
1043 .pending_participants
1044 .iter()
1045 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1046
1047 Some(proto::IncomingCall {
1048 room_id: room.id,
1049 calling_user_id: pending_participant.calling_user_id,
1050 participant_user_ids: room
1051 .participants
1052 .iter()
1053 .map(|participant| participant.user_id)
1054 .collect(),
1055 initial_project: room.participants.iter().find_map(|participant| {
1056 let initial_project_id = pending_participant.initial_project_id?;
1057 participant
1058 .projects
1059 .iter()
1060 .find(|project| project.id == initial_project_id)
1061 .cloned()
1062 }),
1063 })
1064 }
1065
1066 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1067 let (_, room) = self.get_channel_room(room_id, tx).await?;
1068 Ok(room)
1069 }
1070
1071 pub async fn room_connection_ids(
1072 &self,
1073 room_id: RoomId,
1074 connection_id: ConnectionId,
1075 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1076 self.room_transaction(room_id, |tx| async move {
1077 let mut participants = room_participant::Entity::find()
1078 .filter(room_participant::Column::RoomId.eq(room_id))
1079 .stream(&*tx)
1080 .await?;
1081
1082 let mut is_participant = false;
1083 let mut connection_ids = HashSet::default();
1084 while let Some(participant) = participants.next().await {
1085 let participant = participant?;
1086 if let Some(answering_connection) = participant.answering_connection() {
1087 if answering_connection == connection_id {
1088 is_participant = true;
1089 } else {
1090 connection_ids.insert(answering_connection);
1091 }
1092 }
1093 }
1094
1095 if !is_participant {
1096 Err(anyhow!("not a room participant"))?;
1097 }
1098
1099 Ok(connection_ids)
1100 })
1101 .await
1102 }
1103
1104 async fn get_channel_room(
1105 &self,
1106 room_id: RoomId,
1107 tx: &DatabaseTransaction,
1108 ) -> Result<(Option<channel::Model>, proto::Room)> {
1109 let db_room = room::Entity::find_by_id(room_id)
1110 .one(tx)
1111 .await?
1112 .ok_or_else(|| anyhow!("could not find room"))?;
1113
1114 let mut db_participants = db_room
1115 .find_related(room_participant::Entity)
1116 .stream(tx)
1117 .await?;
1118 let mut participants = HashMap::default();
1119 let mut pending_participants = Vec::new();
1120 while let Some(db_participant) = db_participants.next().await {
1121 let db_participant = db_participant?;
1122 if let (
1123 Some(answering_connection_id),
1124 Some(answering_connection_server_id),
1125 Some(participant_index),
1126 ) = (
1127 db_participant.answering_connection_id,
1128 db_participant.answering_connection_server_id,
1129 db_participant.participant_index,
1130 ) {
1131 let location = match (
1132 db_participant.location_kind,
1133 db_participant.location_project_id,
1134 ) {
1135 (Some(0), Some(project_id)) => {
1136 Some(proto::participant_location::Variant::SharedProject(
1137 proto::participant_location::SharedProject {
1138 id: project_id.to_proto(),
1139 },
1140 ))
1141 }
1142 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1143 Default::default(),
1144 )),
1145 _ => Some(proto::participant_location::Variant::External(
1146 Default::default(),
1147 )),
1148 };
1149
1150 let answering_connection = ConnectionId {
1151 owner_id: answering_connection_server_id.0 as u32,
1152 id: answering_connection_id as u32,
1153 };
1154 participants.insert(
1155 answering_connection,
1156 proto::Participant {
1157 user_id: db_participant.user_id.to_proto(),
1158 peer_id: Some(answering_connection.into()),
1159 projects: Default::default(),
1160 location: Some(proto::ParticipantLocation { variant: location }),
1161 participant_index: participant_index as u32,
1162 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1163 },
1164 );
1165 } else {
1166 pending_participants.push(proto::PendingParticipant {
1167 user_id: db_participant.user_id.to_proto(),
1168 calling_user_id: db_participant.calling_user_id.to_proto(),
1169 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1170 });
1171 }
1172 }
1173 drop(db_participants);
1174
1175 let mut db_projects = db_room
1176 .find_related(project::Entity)
1177 .find_with_related(worktree::Entity)
1178 .stream(tx)
1179 .await?;
1180
1181 while let Some(row) = db_projects.next().await {
1182 let (db_project, db_worktree) = row?;
1183 let host_connection = db_project.host_connection()?;
1184 if let Some(participant) = participants.get_mut(&host_connection) {
1185 let project = if let Some(project) = participant
1186 .projects
1187 .iter_mut()
1188 .find(|project| project.id == db_project.id.to_proto())
1189 {
1190 project
1191 } else {
1192 participant.projects.push(proto::ParticipantProject {
1193 id: db_project.id.to_proto(),
1194 worktree_root_names: Default::default(),
1195 });
1196 participant.projects.last_mut().unwrap()
1197 };
1198
1199 if let Some(db_worktree) = db_worktree {
1200 if db_worktree.visible {
1201 project.worktree_root_names.push(db_worktree.root_name);
1202 }
1203 }
1204 }
1205 }
1206 drop(db_projects);
1207
1208 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1209 let mut followers = Vec::new();
1210 while let Some(db_follower) = db_followers.next().await {
1211 let db_follower = db_follower?;
1212 followers.push(proto::Follower {
1213 leader_id: Some(db_follower.leader_connection().into()),
1214 follower_id: Some(db_follower.follower_connection().into()),
1215 project_id: db_follower.project_id.to_proto(),
1216 });
1217 }
1218 drop(db_followers);
1219
1220 let channel = if let Some(channel_id) = db_room.channel_id {
1221 Some(self.get_channel_internal(channel_id, &*tx).await?)
1222 } else {
1223 None
1224 };
1225
1226 Ok((
1227 channel,
1228 proto::Room {
1229 id: db_room.id.to_proto(),
1230 live_kit_room: db_room.live_kit_room,
1231 participants: participants.into_values().collect(),
1232 pending_participants,
1233 followers,
1234 },
1235 ))
1236 }
1237}