1use super::*;
2
3impl Database {
4 pub async fn clear_stale_room_participants(
5 &self,
6 room_id: RoomId,
7 new_server_id: ServerId,
8 ) -> Result<RoomGuard<RefreshedRoom>> {
9 self.room_transaction(room_id, |tx| async move {
10 let stale_participant_filter = Condition::all()
11 .add(room_participant::Column::RoomId.eq(room_id))
12 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
13 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
14
15 let stale_participant_user_ids = room_participant::Entity::find()
16 .filter(stale_participant_filter.clone())
17 .all(&*tx)
18 .await?
19 .into_iter()
20 .map(|participant| participant.user_id)
21 .collect::<Vec<_>>();
22
23 // Delete participants who failed to reconnect and cancel their calls.
24 let mut canceled_calls_to_user_ids = Vec::new();
25 room_participant::Entity::delete_many()
26 .filter(stale_participant_filter)
27 .exec(&*tx)
28 .await?;
29 let called_participants = room_participant::Entity::find()
30 .filter(
31 Condition::all()
32 .add(
33 room_participant::Column::CallingUserId
34 .is_in(stale_participant_user_ids.iter().copied()),
35 )
36 .add(room_participant::Column::AnsweringConnectionId.is_null()),
37 )
38 .all(&*tx)
39 .await?;
40 room_participant::Entity::delete_many()
41 .filter(
42 room_participant::Column::Id
43 .is_in(called_participants.iter().map(|participant| participant.id)),
44 )
45 .exec(&*tx)
46 .await?;
47 canceled_calls_to_user_ids.extend(
48 called_participants
49 .into_iter()
50 .map(|participant| participant.user_id),
51 );
52
53 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
54 let channel_members;
55 if let Some(channel) = &channel {
56 channel_members = self.get_channel_participants(channel, &tx).await?;
57 } else {
58 channel_members = Vec::new();
59
60 // Delete the room if it becomes empty.
61 if room.participants.is_empty() {
62 project::Entity::delete_many()
63 .filter(project::Column::RoomId.eq(room_id))
64 .exec(&*tx)
65 .await?;
66 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
67 }
68 };
69
70 Ok(RefreshedRoom {
71 room,
72 channel_id: channel.map(|channel| channel.id),
73 channel_members,
74 stale_participant_user_ids,
75 canceled_calls_to_user_ids,
76 })
77 })
78 .await
79 }
80
81 pub async fn incoming_call_for_user(
82 &self,
83 user_id: UserId,
84 ) -> Result<Option<proto::IncomingCall>> {
85 self.transaction(|tx| async move {
86 let pending_participant = room_participant::Entity::find()
87 .filter(
88 room_participant::Column::UserId
89 .eq(user_id)
90 .and(room_participant::Column::AnsweringConnectionId.is_null()),
91 )
92 .one(&*tx)
93 .await?;
94
95 if let Some(pending_participant) = pending_participant {
96 let room = self.get_room(pending_participant.room_id, &tx).await?;
97 Ok(Self::build_incoming_call(&room, user_id))
98 } else {
99 Ok(None)
100 }
101 })
102 .await
103 }
104
105 pub async fn create_room(
106 &self,
107 user_id: UserId,
108 connection: ConnectionId,
109 live_kit_room: &str,
110 release_channel: &str,
111 ) -> Result<proto::Room> {
112 self.transaction(|tx| async move {
113 let room = room::ActiveModel {
114 live_kit_room: ActiveValue::set(live_kit_room.into()),
115 environment: ActiveValue::set(Some(release_channel.to_string())),
116 ..Default::default()
117 }
118 .insert(&*tx)
119 .await?;
120 room_participant::ActiveModel {
121 room_id: ActiveValue::set(room.id),
122 user_id: ActiveValue::set(user_id),
123 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
124 answering_connection_server_id: ActiveValue::set(Some(ServerId(
125 connection.owner_id as i32,
126 ))),
127 answering_connection_lost: ActiveValue::set(false),
128 calling_user_id: ActiveValue::set(user_id),
129 calling_connection_id: ActiveValue::set(connection.id as i32),
130 calling_connection_server_id: ActiveValue::set(Some(ServerId(
131 connection.owner_id as i32,
132 ))),
133 participant_index: ActiveValue::set(Some(0)),
134 role: ActiveValue::set(Some(ChannelRole::Admin)),
135
136 id: ActiveValue::NotSet,
137 location_kind: ActiveValue::NotSet,
138 location_project_id: ActiveValue::NotSet,
139 initial_project_id: ActiveValue::NotSet,
140 }
141 .insert(&*tx)
142 .await?;
143
144 let room = self.get_room(room.id, &tx).await?;
145 Ok(room)
146 })
147 .await
148 }
149
150 pub async fn call(
151 &self,
152 room_id: RoomId,
153 calling_user_id: UserId,
154 calling_connection: ConnectionId,
155 called_user_id: UserId,
156 initial_project_id: Option<ProjectId>,
157 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
158 self.room_transaction(room_id, |tx| async move {
159 let caller = room_participant::Entity::find()
160 .filter(
161 room_participant::Column::UserId
162 .eq(calling_user_id)
163 .and(room_participant::Column::RoomId.eq(room_id)),
164 )
165 .one(&*tx)
166 .await?
167 .ok_or_else(|| anyhow!("user is not in the room"))?;
168
169 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
170 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
171 ChannelRole::Guest => ChannelRole::Guest,
172 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
173 };
174
175 room_participant::ActiveModel {
176 room_id: ActiveValue::set(room_id),
177 user_id: ActiveValue::set(called_user_id),
178 answering_connection_lost: ActiveValue::set(false),
179 participant_index: ActiveValue::NotSet,
180 calling_user_id: ActiveValue::set(calling_user_id),
181 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
182 calling_connection_server_id: ActiveValue::set(Some(ServerId(
183 calling_connection.owner_id as i32,
184 ))),
185 initial_project_id: ActiveValue::set(initial_project_id),
186 role: ActiveValue::set(Some(called_user_role)),
187
188 id: ActiveValue::NotSet,
189 answering_connection_id: ActiveValue::NotSet,
190 answering_connection_server_id: ActiveValue::NotSet,
191 location_kind: ActiveValue::NotSet,
192 location_project_id: ActiveValue::NotSet,
193 }
194 .insert(&*tx)
195 .await?;
196
197 let room = self.get_room(room_id, &tx).await?;
198 let incoming_call = Self::build_incoming_call(&room, called_user_id)
199 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
200 Ok((room, incoming_call))
201 })
202 .await
203 }
204
205 pub async fn call_failed(
206 &self,
207 room_id: RoomId,
208 called_user_id: UserId,
209 ) -> Result<RoomGuard<proto::Room>> {
210 self.room_transaction(room_id, |tx| async move {
211 room_participant::Entity::delete_many()
212 .filter(
213 room_participant::Column::RoomId
214 .eq(room_id)
215 .and(room_participant::Column::UserId.eq(called_user_id)),
216 )
217 .exec(&*tx)
218 .await?;
219 let room = self.get_room(room_id, &tx).await?;
220 Ok(room)
221 })
222 .await
223 }
224
225 pub async fn decline_call(
226 &self,
227 expected_room_id: Option<RoomId>,
228 user_id: UserId,
229 ) -> Result<Option<RoomGuard<proto::Room>>> {
230 self.optional_room_transaction(|tx| async move {
231 let mut filter = Condition::all()
232 .add(room_participant::Column::UserId.eq(user_id))
233 .add(room_participant::Column::AnsweringConnectionId.is_null());
234 if let Some(room_id) = expected_room_id {
235 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
236 }
237 let participant = room_participant::Entity::find()
238 .filter(filter)
239 .one(&*tx)
240 .await?;
241
242 let participant = if let Some(participant) = participant {
243 participant
244 } else if expected_room_id.is_some() {
245 return Err(anyhow!("could not find call to decline"))?;
246 } else {
247 return Ok(None);
248 };
249
250 let room_id = participant.room_id;
251 room_participant::Entity::delete(participant.into_active_model())
252 .exec(&*tx)
253 .await?;
254
255 let room = self.get_room(room_id, &tx).await?;
256 Ok(Some((room_id, room)))
257 })
258 .await
259 }
260
261 pub async fn cancel_call(
262 &self,
263 room_id: RoomId,
264 calling_connection: ConnectionId,
265 called_user_id: UserId,
266 ) -> Result<RoomGuard<proto::Room>> {
267 self.room_transaction(room_id, |tx| async move {
268 let participant = room_participant::Entity::find()
269 .filter(
270 Condition::all()
271 .add(room_participant::Column::UserId.eq(called_user_id))
272 .add(room_participant::Column::RoomId.eq(room_id))
273 .add(
274 room_participant::Column::CallingConnectionId
275 .eq(calling_connection.id as i32),
276 )
277 .add(
278 room_participant::Column::CallingConnectionServerId
279 .eq(calling_connection.owner_id as i32),
280 )
281 .add(room_participant::Column::AnsweringConnectionId.is_null()),
282 )
283 .one(&*tx)
284 .await?
285 .ok_or_else(|| anyhow!("no call to cancel"))?;
286
287 room_participant::Entity::delete(participant.into_active_model())
288 .exec(&*tx)
289 .await?;
290
291 let room = self.get_room(room_id, &tx).await?;
292 Ok(room)
293 })
294 .await
295 }
296
297 pub async fn join_room(
298 &self,
299 room_id: RoomId,
300 user_id: UserId,
301 connection: ConnectionId,
302 environment: &str,
303 ) -> Result<RoomGuard<JoinRoom>> {
304 self.room_transaction(room_id, |tx| async move {
305 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
306 enum QueryChannelIdAndEnvironment {
307 ChannelId,
308 Environment,
309 }
310
311 let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
312 room::Entity::find()
313 .select_only()
314 .column(room::Column::ChannelId)
315 .column(room::Column::Environment)
316 .filter(room::Column::Id.eq(room_id))
317 .into_values::<_, QueryChannelIdAndEnvironment>()
318 .one(&*tx)
319 .await?
320 .ok_or_else(|| anyhow!("no such room"))?;
321
322 if let Some(release_channel) = release_channel {
323 if &release_channel != environment {
324 Err(anyhow!("must join using the {} release", release_channel))?;
325 }
326 }
327
328 if channel_id.is_some() {
329 Err(anyhow!("tried to join channel call directly"))?
330 }
331
332 let participant_index = self
333 .get_next_participant_index_internal(room_id, &*tx)
334 .await?;
335
336 let result = room_participant::Entity::update_many()
337 .filter(
338 Condition::all()
339 .add(room_participant::Column::RoomId.eq(room_id))
340 .add(room_participant::Column::UserId.eq(user_id))
341 .add(room_participant::Column::AnsweringConnectionId.is_null()),
342 )
343 .set(room_participant::ActiveModel {
344 participant_index: ActiveValue::Set(Some(participant_index)),
345 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
346 answering_connection_server_id: ActiveValue::set(Some(ServerId(
347 connection.owner_id as i32,
348 ))),
349 answering_connection_lost: ActiveValue::set(false),
350 ..Default::default()
351 })
352 .exec(&*tx)
353 .await?;
354 if result.rows_affected == 0 {
355 Err(anyhow!("room does not exist or was already joined"))?;
356 }
357
358 let room = self.get_room(room_id, &tx).await?;
359 Ok(JoinRoom {
360 room,
361 channel_id: None,
362 channel_members: vec![],
363 })
364 })
365 .await
366 }
367
368 async fn get_next_participant_index_internal(
369 &self,
370 room_id: RoomId,
371 tx: &DatabaseTransaction,
372 ) -> Result<i32> {
373 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
374 enum QueryParticipantIndices {
375 ParticipantIndex,
376 }
377 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
378 .filter(
379 room_participant::Column::RoomId
380 .eq(room_id)
381 .and(room_participant::Column::ParticipantIndex.is_not_null()),
382 )
383 .select_only()
384 .column(room_participant::Column::ParticipantIndex)
385 .into_values::<_, QueryParticipantIndices>()
386 .all(&*tx)
387 .await?;
388
389 let mut participant_index = 0;
390 while existing_participant_indices.contains(&participant_index) {
391 participant_index += 1;
392 }
393
394 Ok(participant_index)
395 }
396
397 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
398 self.transaction(|tx| async move {
399 let room: Option<room::Model> = room::Entity::find()
400 .filter(room::Column::Id.eq(room_id))
401 .one(&*tx)
402 .await?;
403
404 Ok(room.and_then(|room| room.channel_id))
405 })
406 .await
407 }
408
409 pub(crate) async fn join_channel_room_internal(
410 &self,
411 room_id: RoomId,
412 user_id: UserId,
413 connection: ConnectionId,
414 role: ChannelRole,
415 tx: &DatabaseTransaction,
416 ) -> Result<JoinRoom> {
417 let participant_index = self
418 .get_next_participant_index_internal(room_id, &*tx)
419 .await?;
420
421 room_participant::Entity::insert_many([room_participant::ActiveModel {
422 room_id: ActiveValue::set(room_id),
423 user_id: ActiveValue::set(user_id),
424 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
425 answering_connection_server_id: ActiveValue::set(Some(ServerId(
426 connection.owner_id as i32,
427 ))),
428 answering_connection_lost: ActiveValue::set(false),
429 calling_user_id: ActiveValue::set(user_id),
430 calling_connection_id: ActiveValue::set(connection.id as i32),
431 calling_connection_server_id: ActiveValue::set(Some(ServerId(
432 connection.owner_id as i32,
433 ))),
434 participant_index: ActiveValue::Set(Some(participant_index)),
435 role: ActiveValue::set(Some(role)),
436 id: ActiveValue::NotSet,
437 location_kind: ActiveValue::NotSet,
438 location_project_id: ActiveValue::NotSet,
439 initial_project_id: ActiveValue::NotSet,
440 }])
441 .on_conflict(
442 OnConflict::columns([room_participant::Column::UserId])
443 .update_columns([
444 room_participant::Column::AnsweringConnectionId,
445 room_participant::Column::AnsweringConnectionServerId,
446 room_participant::Column::AnsweringConnectionLost,
447 room_participant::Column::ParticipantIndex,
448 room_participant::Column::Role,
449 ])
450 .to_owned(),
451 )
452 .exec(&*tx)
453 .await?;
454
455 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
456 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
457 let channel_members = self.get_channel_participants(&channel, &*tx).await?;
458 Ok(JoinRoom {
459 room,
460 channel_id: Some(channel.id),
461 channel_members,
462 })
463 }
464
465 pub async fn rejoin_room(
466 &self,
467 rejoin_room: proto::RejoinRoom,
468 user_id: UserId,
469 connection: ConnectionId,
470 ) -> Result<RoomGuard<RejoinedRoom>> {
471 let room_id = RoomId::from_proto(rejoin_room.id);
472 self.room_transaction(room_id, |tx| async {
473 let tx = tx;
474 let participant_update = room_participant::Entity::update_many()
475 .filter(
476 Condition::all()
477 .add(room_participant::Column::RoomId.eq(room_id))
478 .add(room_participant::Column::UserId.eq(user_id))
479 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
480 .add(
481 Condition::any()
482 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
483 .add(
484 room_participant::Column::AnsweringConnectionServerId
485 .ne(connection.owner_id as i32),
486 ),
487 ),
488 )
489 .set(room_participant::ActiveModel {
490 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
491 answering_connection_server_id: ActiveValue::set(Some(ServerId(
492 connection.owner_id as i32,
493 ))),
494 answering_connection_lost: ActiveValue::set(false),
495 ..Default::default()
496 })
497 .exec(&*tx)
498 .await?;
499 if participant_update.rows_affected == 0 {
500 return Err(anyhow!("room does not exist or was already joined"))?;
501 }
502
503 let mut reshared_projects = Vec::new();
504 for reshared_project in &rejoin_room.reshared_projects {
505 let project_id = ProjectId::from_proto(reshared_project.project_id);
506 let project = project::Entity::find_by_id(project_id)
507 .one(&*tx)
508 .await?
509 .ok_or_else(|| anyhow!("project does not exist"))?;
510 if project.host_user_id != user_id {
511 return Err(anyhow!("no such project"))?;
512 }
513
514 let mut collaborators = project
515 .find_related(project_collaborator::Entity)
516 .all(&*tx)
517 .await?;
518 let host_ix = collaborators
519 .iter()
520 .position(|collaborator| {
521 collaborator.user_id == user_id && collaborator.is_host
522 })
523 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
524 let host = collaborators.swap_remove(host_ix);
525 let old_connection_id = host.connection();
526
527 project::Entity::update(project::ActiveModel {
528 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
529 host_connection_server_id: ActiveValue::set(Some(ServerId(
530 connection.owner_id as i32,
531 ))),
532 ..project.into_active_model()
533 })
534 .exec(&*tx)
535 .await?;
536 project_collaborator::Entity::update(project_collaborator::ActiveModel {
537 connection_id: ActiveValue::set(connection.id as i32),
538 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
539 ..host.into_active_model()
540 })
541 .exec(&*tx)
542 .await?;
543
544 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
545 .await?;
546
547 reshared_projects.push(ResharedProject {
548 id: project_id,
549 old_connection_id,
550 collaborators: collaborators
551 .iter()
552 .map(|collaborator| ProjectCollaborator {
553 connection_id: collaborator.connection(),
554 user_id: collaborator.user_id,
555 replica_id: collaborator.replica_id,
556 is_host: collaborator.is_host,
557 })
558 .collect(),
559 worktrees: reshared_project.worktrees.clone(),
560 });
561 }
562
563 project::Entity::delete_many()
564 .filter(
565 Condition::all()
566 .add(project::Column::RoomId.eq(room_id))
567 .add(project::Column::HostUserId.eq(user_id))
568 .add(
569 project::Column::Id
570 .is_not_in(reshared_projects.iter().map(|project| project.id)),
571 ),
572 )
573 .exec(&*tx)
574 .await?;
575
576 let mut rejoined_projects = Vec::new();
577 for rejoined_project in &rejoin_room.rejoined_projects {
578 let project_id = ProjectId::from_proto(rejoined_project.id);
579 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
580 continue;
581 };
582
583 let mut worktrees = Vec::new();
584 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
585 for db_worktree in db_worktrees {
586 let mut worktree = RejoinedWorktree {
587 id: db_worktree.id as u64,
588 abs_path: db_worktree.abs_path,
589 root_name: db_worktree.root_name,
590 visible: db_worktree.visible,
591 updated_entries: Default::default(),
592 removed_entries: Default::default(),
593 updated_repositories: Default::default(),
594 removed_repositories: Default::default(),
595 diagnostic_summaries: Default::default(),
596 settings_files: Default::default(),
597 scan_id: db_worktree.scan_id as u64,
598 completed_scan_id: db_worktree.completed_scan_id as u64,
599 };
600
601 let rejoined_worktree = rejoined_project
602 .worktrees
603 .iter()
604 .find(|worktree| worktree.id == db_worktree.id as u64);
605
606 // File entries
607 {
608 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
609 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
610 } else {
611 worktree_entry::Column::IsDeleted.eq(false)
612 };
613
614 let mut db_entries = worktree_entry::Entity::find()
615 .filter(
616 Condition::all()
617 .add(worktree_entry::Column::ProjectId.eq(project.id))
618 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
619 .add(entry_filter),
620 )
621 .stream(&*tx)
622 .await?;
623
624 while let Some(db_entry) = db_entries.next().await {
625 let db_entry = db_entry?;
626 if db_entry.is_deleted {
627 worktree.removed_entries.push(db_entry.id as u64);
628 } else {
629 worktree.updated_entries.push(proto::Entry {
630 id: db_entry.id as u64,
631 is_dir: db_entry.is_dir,
632 path: db_entry.path,
633 inode: db_entry.inode as u64,
634 mtime: Some(proto::Timestamp {
635 seconds: db_entry.mtime_seconds as u64,
636 nanos: db_entry.mtime_nanos as u32,
637 }),
638 is_symlink: db_entry.is_symlink,
639 is_ignored: db_entry.is_ignored,
640 is_external: db_entry.is_external,
641 git_status: db_entry.git_status.map(|status| status as i32),
642 });
643 }
644 }
645 }
646
647 // Repository Entries
648 {
649 let repository_entry_filter =
650 if let Some(rejoined_worktree) = rejoined_worktree {
651 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
652 } else {
653 worktree_repository::Column::IsDeleted.eq(false)
654 };
655
656 let mut db_repositories = worktree_repository::Entity::find()
657 .filter(
658 Condition::all()
659 .add(worktree_repository::Column::ProjectId.eq(project.id))
660 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
661 .add(repository_entry_filter),
662 )
663 .stream(&*tx)
664 .await?;
665
666 while let Some(db_repository) = db_repositories.next().await {
667 let db_repository = db_repository?;
668 if db_repository.is_deleted {
669 worktree
670 .removed_repositories
671 .push(db_repository.work_directory_id as u64);
672 } else {
673 worktree.updated_repositories.push(proto::RepositoryEntry {
674 work_directory_id: db_repository.work_directory_id as u64,
675 branch: db_repository.branch,
676 });
677 }
678 }
679 }
680
681 worktrees.push(worktree);
682 }
683
684 let language_servers = project
685 .find_related(language_server::Entity)
686 .all(&*tx)
687 .await?
688 .into_iter()
689 .map(|language_server| proto::LanguageServer {
690 id: language_server.id as u64,
691 name: language_server.name,
692 })
693 .collect::<Vec<_>>();
694
695 {
696 let mut db_settings_files = worktree_settings_file::Entity::find()
697 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
698 .stream(&*tx)
699 .await?;
700 while let Some(db_settings_file) = db_settings_files.next().await {
701 let db_settings_file = db_settings_file?;
702 if let Some(worktree) = worktrees
703 .iter_mut()
704 .find(|w| w.id == db_settings_file.worktree_id as u64)
705 {
706 worktree.settings_files.push(WorktreeSettingsFile {
707 path: db_settings_file.path,
708 content: db_settings_file.content,
709 });
710 }
711 }
712 }
713
714 let mut collaborators = project
715 .find_related(project_collaborator::Entity)
716 .all(&*tx)
717 .await?;
718 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
719 .iter()
720 .position(|collaborator| collaborator.user_id == user_id)
721 {
722 collaborators.swap_remove(self_collaborator_ix)
723 } else {
724 continue;
725 };
726 let old_connection_id = self_collaborator.connection();
727 project_collaborator::Entity::update(project_collaborator::ActiveModel {
728 connection_id: ActiveValue::set(connection.id as i32),
729 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
730 ..self_collaborator.into_active_model()
731 })
732 .exec(&*tx)
733 .await?;
734
735 let collaborators = collaborators
736 .into_iter()
737 .map(|collaborator| ProjectCollaborator {
738 connection_id: collaborator.connection(),
739 user_id: collaborator.user_id,
740 replica_id: collaborator.replica_id,
741 is_host: collaborator.is_host,
742 })
743 .collect::<Vec<_>>();
744
745 rejoined_projects.push(RejoinedProject {
746 id: project_id,
747 old_connection_id,
748 collaborators,
749 worktrees,
750 language_servers,
751 });
752 }
753
754 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
755 let channel_members = if let Some(channel) = &channel {
756 self.get_channel_participants(&channel, &tx).await?
757 } else {
758 Vec::new()
759 };
760
761 Ok(RejoinedRoom {
762 room,
763 channel_id: channel.map(|channel| channel.id),
764 channel_members,
765 rejoined_projects,
766 reshared_projects,
767 })
768 })
769 .await
770 }
771
772 pub async fn leave_room(
773 &self,
774 connection: ConnectionId,
775 ) -> Result<Option<RoomGuard<LeftRoom>>> {
776 self.optional_room_transaction(|tx| async move {
777 let leaving_participant = room_participant::Entity::find()
778 .filter(
779 Condition::all()
780 .add(
781 room_participant::Column::AnsweringConnectionId
782 .eq(connection.id as i32),
783 )
784 .add(
785 room_participant::Column::AnsweringConnectionServerId
786 .eq(connection.owner_id as i32),
787 ),
788 )
789 .one(&*tx)
790 .await?;
791
792 if let Some(leaving_participant) = leaving_participant {
793 // Leave room.
794 let room_id = leaving_participant.room_id;
795 room_participant::Entity::delete_by_id(leaving_participant.id)
796 .exec(&*tx)
797 .await?;
798
799 // Cancel pending calls initiated by the leaving user.
800 let called_participants = room_participant::Entity::find()
801 .filter(
802 Condition::all()
803 .add(
804 room_participant::Column::CallingUserId
805 .eq(leaving_participant.user_id),
806 )
807 .add(room_participant::Column::AnsweringConnectionId.is_null()),
808 )
809 .all(&*tx)
810 .await?;
811 room_participant::Entity::delete_many()
812 .filter(
813 room_participant::Column::Id
814 .is_in(called_participants.iter().map(|participant| participant.id)),
815 )
816 .exec(&*tx)
817 .await?;
818 let canceled_calls_to_user_ids = called_participants
819 .into_iter()
820 .map(|participant| participant.user_id)
821 .collect();
822
823 // Detect left projects.
824 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
825 enum QueryProjectIds {
826 ProjectId,
827 }
828 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
829 .select_only()
830 .column_as(
831 project_collaborator::Column::ProjectId,
832 QueryProjectIds::ProjectId,
833 )
834 .filter(
835 Condition::all()
836 .add(
837 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
838 )
839 .add(
840 project_collaborator::Column::ConnectionServerId
841 .eq(connection.owner_id as i32),
842 ),
843 )
844 .into_values::<_, QueryProjectIds>()
845 .all(&*tx)
846 .await?;
847 let mut left_projects = HashMap::default();
848 let mut collaborators = project_collaborator::Entity::find()
849 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
850 .stream(&*tx)
851 .await?;
852 while let Some(collaborator) = collaborators.next().await {
853 let collaborator = collaborator?;
854 let left_project =
855 left_projects
856 .entry(collaborator.project_id)
857 .or_insert(LeftProject {
858 id: collaborator.project_id,
859 host_user_id: Default::default(),
860 connection_ids: Default::default(),
861 host_connection_id: Default::default(),
862 });
863
864 let collaborator_connection_id = collaborator.connection();
865 if collaborator_connection_id != connection {
866 left_project.connection_ids.push(collaborator_connection_id);
867 }
868
869 if collaborator.is_host {
870 left_project.host_user_id = collaborator.user_id;
871 left_project.host_connection_id = collaborator_connection_id;
872 }
873 }
874 drop(collaborators);
875
876 // Leave projects.
877 project_collaborator::Entity::delete_many()
878 .filter(
879 Condition::all()
880 .add(
881 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
882 )
883 .add(
884 project_collaborator::Column::ConnectionServerId
885 .eq(connection.owner_id as i32),
886 ),
887 )
888 .exec(&*tx)
889 .await?;
890
891 follower::Entity::delete_many()
892 .filter(
893 Condition::all()
894 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
895 )
896 .exec(&*tx)
897 .await?;
898
899 // Unshare projects.
900 project::Entity::delete_many()
901 .filter(
902 Condition::all()
903 .add(project::Column::RoomId.eq(room_id))
904 .add(project::Column::HostConnectionId.eq(connection.id as i32))
905 .add(
906 project::Column::HostConnectionServerId
907 .eq(connection.owner_id as i32),
908 ),
909 )
910 .exec(&*tx)
911 .await?;
912
913 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
914 let deleted = if room.participants.is_empty() {
915 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
916 result.rows_affected > 0
917 } else {
918 false
919 };
920
921 let channel_members = if let Some(channel) = &channel {
922 self.get_channel_participants(channel, &tx).await?
923 } else {
924 Vec::new()
925 };
926 let left_room = LeftRoom {
927 room,
928 channel_id: channel.map(|channel| channel.id),
929 channel_members,
930 left_projects,
931 canceled_calls_to_user_ids,
932 deleted,
933 };
934
935 if left_room.room.participants.is_empty() {
936 self.rooms.remove(&room_id);
937 }
938
939 Ok(Some((room_id, left_room)))
940 } else {
941 Ok(None)
942 }
943 })
944 .await
945 }
946
947 pub async fn update_room_participant_location(
948 &self,
949 room_id: RoomId,
950 connection: ConnectionId,
951 location: proto::ParticipantLocation,
952 ) -> Result<RoomGuard<proto::Room>> {
953 self.room_transaction(room_id, |tx| async {
954 let tx = tx;
955 let location_kind;
956 let location_project_id;
957 match location
958 .variant
959 .as_ref()
960 .ok_or_else(|| anyhow!("invalid location"))?
961 {
962 proto::participant_location::Variant::SharedProject(project) => {
963 location_kind = 0;
964 location_project_id = Some(ProjectId::from_proto(project.id));
965 }
966 proto::participant_location::Variant::UnsharedProject(_) => {
967 location_kind = 1;
968 location_project_id = None;
969 }
970 proto::participant_location::Variant::External(_) => {
971 location_kind = 2;
972 location_project_id = None;
973 }
974 }
975
976 let result = room_participant::Entity::update_many()
977 .filter(
978 Condition::all()
979 .add(room_participant::Column::RoomId.eq(room_id))
980 .add(
981 room_participant::Column::AnsweringConnectionId
982 .eq(connection.id as i32),
983 )
984 .add(
985 room_participant::Column::AnsweringConnectionServerId
986 .eq(connection.owner_id as i32),
987 ),
988 )
989 .set(room_participant::ActiveModel {
990 location_kind: ActiveValue::set(Some(location_kind)),
991 location_project_id: ActiveValue::set(location_project_id),
992 ..Default::default()
993 })
994 .exec(&*tx)
995 .await?;
996
997 if result.rows_affected == 1 {
998 let room = self.get_room(room_id, &tx).await?;
999 Ok(room)
1000 } else {
1001 Err(anyhow!("could not update room participant location"))?
1002 }
1003 })
1004 .await
1005 }
1006
1007 pub async fn set_room_participant_role(
1008 &self,
1009 admin_id: UserId,
1010 room_id: RoomId,
1011 user_id: UserId,
1012 role: ChannelRole,
1013 ) -> Result<RoomGuard<proto::Room>> {
1014 self.room_transaction(room_id, |tx| async move {
1015 room_participant::Entity::find()
1016 .filter(
1017 Condition::all()
1018 .add(room_participant::Column::RoomId.eq(room_id))
1019 .add(room_participant::Column::UserId.eq(admin_id))
1020 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1021 )
1022 .one(&*tx)
1023 .await?
1024 .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1025
1026 let result = room_participant::Entity::update_many()
1027 .filter(
1028 Condition::all()
1029 .add(room_participant::Column::RoomId.eq(room_id))
1030 .add(room_participant::Column::UserId.eq(user_id)),
1031 )
1032 .set(room_participant::ActiveModel {
1033 role: ActiveValue::set(Some(ChannelRole::from(role))),
1034 ..Default::default()
1035 })
1036 .exec(&*tx)
1037 .await?;
1038
1039 if result.rows_affected != 1 {
1040 Err(anyhow!("could not update room participant role"))?;
1041 }
1042 Ok(self.get_room(room_id, &tx).await?)
1043 })
1044 .await
1045 }
1046
1047 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1048 self.transaction(|tx| async move {
1049 self.room_connection_lost(connection, &*tx).await?;
1050 self.channel_buffer_connection_lost(connection, &*tx)
1051 .await?;
1052 self.channel_chat_connection_lost(connection, &*tx).await?;
1053 Ok(())
1054 })
1055 .await
1056 }
1057
1058 pub async fn room_connection_lost(
1059 &self,
1060 connection: ConnectionId,
1061 tx: &DatabaseTransaction,
1062 ) -> Result<()> {
1063 let participant = room_participant::Entity::find()
1064 .filter(
1065 Condition::all()
1066 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1067 .add(
1068 room_participant::Column::AnsweringConnectionServerId
1069 .eq(connection.owner_id as i32),
1070 ),
1071 )
1072 .one(&*tx)
1073 .await?;
1074
1075 if let Some(participant) = participant {
1076 room_participant::Entity::update(room_participant::ActiveModel {
1077 answering_connection_lost: ActiveValue::set(true),
1078 ..participant.into_active_model()
1079 })
1080 .exec(&*tx)
1081 .await?;
1082 }
1083 Ok(())
1084 }
1085
1086 fn build_incoming_call(
1087 room: &proto::Room,
1088 called_user_id: UserId,
1089 ) -> Option<proto::IncomingCall> {
1090 let pending_participant = room
1091 .pending_participants
1092 .iter()
1093 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1094
1095 Some(proto::IncomingCall {
1096 room_id: room.id,
1097 calling_user_id: pending_participant.calling_user_id,
1098 participant_user_ids: room
1099 .participants
1100 .iter()
1101 .map(|participant| participant.user_id)
1102 .collect(),
1103 initial_project: room.participants.iter().find_map(|participant| {
1104 let initial_project_id = pending_participant.initial_project_id?;
1105 participant
1106 .projects
1107 .iter()
1108 .find(|project| project.id == initial_project_id)
1109 .cloned()
1110 }),
1111 })
1112 }
1113
1114 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1115 let (_, room) = self.get_channel_room(room_id, tx).await?;
1116 Ok(room)
1117 }
1118
1119 pub async fn room_connection_ids(
1120 &self,
1121 room_id: RoomId,
1122 connection_id: ConnectionId,
1123 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1124 self.room_transaction(room_id, |tx| async move {
1125 let mut participants = room_participant::Entity::find()
1126 .filter(room_participant::Column::RoomId.eq(room_id))
1127 .stream(&*tx)
1128 .await?;
1129
1130 let mut is_participant = false;
1131 let mut connection_ids = HashSet::default();
1132 while let Some(participant) = participants.next().await {
1133 let participant = participant?;
1134 if let Some(answering_connection) = participant.answering_connection() {
1135 if answering_connection == connection_id {
1136 is_participant = true;
1137 } else {
1138 connection_ids.insert(answering_connection);
1139 }
1140 }
1141 }
1142
1143 if !is_participant {
1144 Err(anyhow!("not a room participant"))?;
1145 }
1146
1147 Ok(connection_ids)
1148 })
1149 .await
1150 }
1151
1152 async fn get_channel_room(
1153 &self,
1154 room_id: RoomId,
1155 tx: &DatabaseTransaction,
1156 ) -> Result<(Option<channel::Model>, proto::Room)> {
1157 let db_room = room::Entity::find_by_id(room_id)
1158 .one(tx)
1159 .await?
1160 .ok_or_else(|| anyhow!("could not find room"))?;
1161
1162 let mut db_participants = db_room
1163 .find_related(room_participant::Entity)
1164 .stream(tx)
1165 .await?;
1166 let mut participants = HashMap::default();
1167 let mut pending_participants = Vec::new();
1168 while let Some(db_participant) = db_participants.next().await {
1169 let db_participant = db_participant?;
1170 if let (
1171 Some(answering_connection_id),
1172 Some(answering_connection_server_id),
1173 Some(participant_index),
1174 ) = (
1175 db_participant.answering_connection_id,
1176 db_participant.answering_connection_server_id,
1177 db_participant.participant_index,
1178 ) {
1179 let location = match (
1180 db_participant.location_kind,
1181 db_participant.location_project_id,
1182 ) {
1183 (Some(0), Some(project_id)) => {
1184 Some(proto::participant_location::Variant::SharedProject(
1185 proto::participant_location::SharedProject {
1186 id: project_id.to_proto(),
1187 },
1188 ))
1189 }
1190 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1191 Default::default(),
1192 )),
1193 _ => Some(proto::participant_location::Variant::External(
1194 Default::default(),
1195 )),
1196 };
1197
1198 let answering_connection = ConnectionId {
1199 owner_id: answering_connection_server_id.0 as u32,
1200 id: answering_connection_id as u32,
1201 };
1202 participants.insert(
1203 answering_connection,
1204 proto::Participant {
1205 user_id: db_participant.user_id.to_proto(),
1206 peer_id: Some(answering_connection.into()),
1207 projects: Default::default(),
1208 location: Some(proto::ParticipantLocation { variant: location }),
1209 participant_index: participant_index as u32,
1210 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1211 },
1212 );
1213 } else {
1214 pending_participants.push(proto::PendingParticipant {
1215 user_id: db_participant.user_id.to_proto(),
1216 calling_user_id: db_participant.calling_user_id.to_proto(),
1217 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1218 });
1219 }
1220 }
1221 drop(db_participants);
1222
1223 let mut db_projects = db_room
1224 .find_related(project::Entity)
1225 .find_with_related(worktree::Entity)
1226 .stream(tx)
1227 .await?;
1228
1229 while let Some(row) = db_projects.next().await {
1230 let (db_project, db_worktree) = row?;
1231 let host_connection = db_project.host_connection()?;
1232 if let Some(participant) = participants.get_mut(&host_connection) {
1233 let project = if let Some(project) = participant
1234 .projects
1235 .iter_mut()
1236 .find(|project| project.id == db_project.id.to_proto())
1237 {
1238 project
1239 } else {
1240 participant.projects.push(proto::ParticipantProject {
1241 id: db_project.id.to_proto(),
1242 worktree_root_names: Default::default(),
1243 });
1244 participant.projects.last_mut().unwrap()
1245 };
1246
1247 if let Some(db_worktree) = db_worktree {
1248 if db_worktree.visible {
1249 project.worktree_root_names.push(db_worktree.root_name);
1250 }
1251 }
1252 }
1253 }
1254 drop(db_projects);
1255
1256 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1257 let mut followers = Vec::new();
1258 while let Some(db_follower) = db_followers.next().await {
1259 let db_follower = db_follower?;
1260 followers.push(proto::Follower {
1261 leader_id: Some(db_follower.leader_connection().into()),
1262 follower_id: Some(db_follower.follower_connection().into()),
1263 project_id: db_follower.project_id.to_proto(),
1264 });
1265 }
1266 drop(db_followers);
1267
1268 let channel = if let Some(channel_id) = db_room.channel_id {
1269 Some(self.get_channel_internal(channel_id, &*tx).await?)
1270 } else {
1271 None
1272 };
1273
1274 Ok((
1275 channel,
1276 proto::Room {
1277 id: db_room.id.to_proto(),
1278 live_kit_room: db_room.live_kit_room,
1279 participants: participants.into_values().collect(),
1280 pending_participants,
1281 followers,
1282 },
1283 ))
1284 }
1285}