1use super::*;
2
3impl Database {
4 /// Clears all room participants in rooms attached to a stale server.
5 pub async fn clear_stale_room_participants(
6 &self,
7 room_id: RoomId,
8 new_server_id: ServerId,
9 ) -> Result<RoomGuard<RefreshedRoom>> {
10 self.room_transaction(room_id, |tx| async move {
11 let stale_participant_filter = Condition::all()
12 .add(room_participant::Column::RoomId.eq(room_id))
13 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
14 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
15
16 let stale_participant_user_ids = room_participant::Entity::find()
17 .filter(stale_participant_filter.clone())
18 .all(&*tx)
19 .await?
20 .into_iter()
21 .map(|participant| participant.user_id)
22 .collect::<Vec<_>>();
23
24 // Delete participants who failed to reconnect and cancel their calls.
25 let mut canceled_calls_to_user_ids = Vec::new();
26 room_participant::Entity::delete_many()
27 .filter(stale_participant_filter)
28 .exec(&*tx)
29 .await?;
30 let called_participants = room_participant::Entity::find()
31 .filter(
32 Condition::all()
33 .add(
34 room_participant::Column::CallingUserId
35 .is_in(stale_participant_user_ids.iter().copied()),
36 )
37 .add(room_participant::Column::AnsweringConnectionId.is_null()),
38 )
39 .all(&*tx)
40 .await?;
41 room_participant::Entity::delete_many()
42 .filter(
43 room_participant::Column::Id
44 .is_in(called_participants.iter().map(|participant| participant.id)),
45 )
46 .exec(&*tx)
47 .await?;
48 canceled_calls_to_user_ids.extend(
49 called_participants
50 .into_iter()
51 .map(|participant| participant.user_id),
52 );
53
54 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
55 let channel_members;
56 if let Some(channel) = &channel {
57 channel_members = self.get_channel_participants(channel, &tx).await?;
58 } else {
59 channel_members = Vec::new();
60
61 // Delete the room if it becomes empty.
62 if room.participants.is_empty() {
63 project::Entity::delete_many()
64 .filter(project::Column::RoomId.eq(room_id))
65 .exec(&*tx)
66 .await?;
67 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
68 }
69 };
70
71 Ok(RefreshedRoom {
72 room,
73 channel_id: channel.map(|channel| channel.id),
74 channel_members,
75 stale_participant_user_ids,
76 canceled_calls_to_user_ids,
77 })
78 })
79 .await
80 }
81
82 /// Returns the incoming calls for user with the given ID.
83 pub async fn incoming_call_for_user(
84 &self,
85 user_id: UserId,
86 ) -> Result<Option<proto::IncomingCall>> {
87 self.transaction(|tx| async move {
88 let pending_participant = room_participant::Entity::find()
89 .filter(
90 room_participant::Column::UserId
91 .eq(user_id)
92 .and(room_participant::Column::AnsweringConnectionId.is_null()),
93 )
94 .one(&*tx)
95 .await?;
96
97 if let Some(pending_participant) = pending_participant {
98 let room = self.get_room(pending_participant.room_id, &tx).await?;
99 Ok(Self::build_incoming_call(&room, user_id))
100 } else {
101 Ok(None)
102 }
103 })
104 .await
105 }
106
107 /// Creates a new room.
108 pub async fn create_room(
109 &self,
110 user_id: UserId,
111 connection: ConnectionId,
112 live_kit_room: &str,
113 release_channel: &str,
114 ) -> Result<proto::Room> {
115 self.transaction(|tx| async move {
116 let room = room::ActiveModel {
117 live_kit_room: ActiveValue::set(live_kit_room.into()),
118 environment: ActiveValue::set(Some(release_channel.to_string())),
119 ..Default::default()
120 }
121 .insert(&*tx)
122 .await?;
123 room_participant::ActiveModel {
124 room_id: ActiveValue::set(room.id),
125 user_id: ActiveValue::set(user_id),
126 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
127 answering_connection_server_id: ActiveValue::set(Some(ServerId(
128 connection.owner_id as i32,
129 ))),
130 answering_connection_lost: ActiveValue::set(false),
131 calling_user_id: ActiveValue::set(user_id),
132 calling_connection_id: ActiveValue::set(connection.id as i32),
133 calling_connection_server_id: ActiveValue::set(Some(ServerId(
134 connection.owner_id as i32,
135 ))),
136 participant_index: ActiveValue::set(Some(0)),
137 role: ActiveValue::set(Some(ChannelRole::Admin)),
138
139 id: ActiveValue::NotSet,
140 location_kind: ActiveValue::NotSet,
141 location_project_id: ActiveValue::NotSet,
142 initial_project_id: ActiveValue::NotSet,
143 }
144 .insert(&*tx)
145 .await?;
146
147 let room = self.get_room(room.id, &tx).await?;
148 Ok(room)
149 })
150 .await
151 }
152
153 pub async fn call(
154 &self,
155 room_id: RoomId,
156 calling_user_id: UserId,
157 calling_connection: ConnectionId,
158 called_user_id: UserId,
159 initial_project_id: Option<ProjectId>,
160 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
161 self.room_transaction(room_id, |tx| async move {
162 let caller = room_participant::Entity::find()
163 .filter(
164 room_participant::Column::UserId
165 .eq(calling_user_id)
166 .and(room_participant::Column::RoomId.eq(room_id)),
167 )
168 .one(&*tx)
169 .await?
170 .ok_or_else(|| anyhow!("user is not in the room"))?;
171
172 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
173 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
174 ChannelRole::Guest => ChannelRole::Guest,
175 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
176 };
177
178 room_participant::ActiveModel {
179 room_id: ActiveValue::set(room_id),
180 user_id: ActiveValue::set(called_user_id),
181 answering_connection_lost: ActiveValue::set(false),
182 participant_index: ActiveValue::NotSet,
183 calling_user_id: ActiveValue::set(calling_user_id),
184 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
185 calling_connection_server_id: ActiveValue::set(Some(ServerId(
186 calling_connection.owner_id as i32,
187 ))),
188 initial_project_id: ActiveValue::set(initial_project_id),
189 role: ActiveValue::set(Some(called_user_role)),
190
191 id: ActiveValue::NotSet,
192 answering_connection_id: ActiveValue::NotSet,
193 answering_connection_server_id: ActiveValue::NotSet,
194 location_kind: ActiveValue::NotSet,
195 location_project_id: ActiveValue::NotSet,
196 }
197 .insert(&*tx)
198 .await?;
199
200 let room = self.get_room(room_id, &tx).await?;
201 let incoming_call = Self::build_incoming_call(&room, called_user_id)
202 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
203 Ok((room, incoming_call))
204 })
205 .await
206 }
207
208 pub async fn call_failed(
209 &self,
210 room_id: RoomId,
211 called_user_id: UserId,
212 ) -> Result<RoomGuard<proto::Room>> {
213 self.room_transaction(room_id, |tx| async move {
214 room_participant::Entity::delete_many()
215 .filter(
216 room_participant::Column::RoomId
217 .eq(room_id)
218 .and(room_participant::Column::UserId.eq(called_user_id)),
219 )
220 .exec(&*tx)
221 .await?;
222 let room = self.get_room(room_id, &tx).await?;
223 Ok(room)
224 })
225 .await
226 }
227
228 pub async fn decline_call(
229 &self,
230 expected_room_id: Option<RoomId>,
231 user_id: UserId,
232 ) -> Result<Option<RoomGuard<proto::Room>>> {
233 self.optional_room_transaction(|tx| async move {
234 let mut filter = Condition::all()
235 .add(room_participant::Column::UserId.eq(user_id))
236 .add(room_participant::Column::AnsweringConnectionId.is_null());
237 if let Some(room_id) = expected_room_id {
238 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
239 }
240 let participant = room_participant::Entity::find()
241 .filter(filter)
242 .one(&*tx)
243 .await?;
244
245 let participant = if let Some(participant) = participant {
246 participant
247 } else if expected_room_id.is_some() {
248 return Err(anyhow!("could not find call to decline"))?;
249 } else {
250 return Ok(None);
251 };
252
253 let room_id = participant.room_id;
254 room_participant::Entity::delete(participant.into_active_model())
255 .exec(&*tx)
256 .await?;
257
258 let room = self.get_room(room_id, &tx).await?;
259 Ok(Some((room_id, room)))
260 })
261 .await
262 }
263
264 pub async fn cancel_call(
265 &self,
266 room_id: RoomId,
267 calling_connection: ConnectionId,
268 called_user_id: UserId,
269 ) -> Result<RoomGuard<proto::Room>> {
270 self.room_transaction(room_id, |tx| async move {
271 let participant = room_participant::Entity::find()
272 .filter(
273 Condition::all()
274 .add(room_participant::Column::UserId.eq(called_user_id))
275 .add(room_participant::Column::RoomId.eq(room_id))
276 .add(
277 room_participant::Column::CallingConnectionId
278 .eq(calling_connection.id as i32),
279 )
280 .add(
281 room_participant::Column::CallingConnectionServerId
282 .eq(calling_connection.owner_id as i32),
283 )
284 .add(room_participant::Column::AnsweringConnectionId.is_null()),
285 )
286 .one(&*tx)
287 .await?
288 .ok_or_else(|| anyhow!("no call to cancel"))?;
289
290 room_participant::Entity::delete(participant.into_active_model())
291 .exec(&*tx)
292 .await?;
293
294 let room = self.get_room(room_id, &tx).await?;
295 Ok(room)
296 })
297 .await
298 }
299
300 pub async fn join_room(
301 &self,
302 room_id: RoomId,
303 user_id: UserId,
304 connection: ConnectionId,
305 environment: &str,
306 ) -> Result<RoomGuard<JoinRoom>> {
307 self.room_transaction(room_id, |tx| async move {
308 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
309 enum QueryChannelIdAndEnvironment {
310 ChannelId,
311 Environment,
312 }
313
314 let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
315 room::Entity::find()
316 .select_only()
317 .column(room::Column::ChannelId)
318 .column(room::Column::Environment)
319 .filter(room::Column::Id.eq(room_id))
320 .into_values::<_, QueryChannelIdAndEnvironment>()
321 .one(&*tx)
322 .await?
323 .ok_or_else(|| anyhow!("no such room"))?;
324
325 if let Some(release_channel) = release_channel {
326 if &release_channel != environment {
327 Err(anyhow!("must join using the {} release", release_channel))?;
328 }
329 }
330
331 if channel_id.is_some() {
332 Err(anyhow!("tried to join channel call directly"))?
333 }
334
335 let participant_index = self
336 .get_next_participant_index_internal(room_id, &*tx)
337 .await?;
338
339 let result = room_participant::Entity::update_many()
340 .filter(
341 Condition::all()
342 .add(room_participant::Column::RoomId.eq(room_id))
343 .add(room_participant::Column::UserId.eq(user_id))
344 .add(room_participant::Column::AnsweringConnectionId.is_null()),
345 )
346 .set(room_participant::ActiveModel {
347 participant_index: ActiveValue::Set(Some(participant_index)),
348 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
349 answering_connection_server_id: ActiveValue::set(Some(ServerId(
350 connection.owner_id as i32,
351 ))),
352 answering_connection_lost: ActiveValue::set(false),
353 ..Default::default()
354 })
355 .exec(&*tx)
356 .await?;
357 if result.rows_affected == 0 {
358 Err(anyhow!("room does not exist or was already joined"))?;
359 }
360
361 let room = self.get_room(room_id, &tx).await?;
362 Ok(JoinRoom {
363 room,
364 channel_id: None,
365 channel_members: vec![],
366 })
367 })
368 .await
369 }
370
371 async fn get_next_participant_index_internal(
372 &self,
373 room_id: RoomId,
374 tx: &DatabaseTransaction,
375 ) -> Result<i32> {
376 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
377 enum QueryParticipantIndices {
378 ParticipantIndex,
379 }
380 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
381 .filter(
382 room_participant::Column::RoomId
383 .eq(room_id)
384 .and(room_participant::Column::ParticipantIndex.is_not_null()),
385 )
386 .select_only()
387 .column(room_participant::Column::ParticipantIndex)
388 .into_values::<_, QueryParticipantIndices>()
389 .all(&*tx)
390 .await?;
391
392 let mut participant_index = 0;
393 while existing_participant_indices.contains(&participant_index) {
394 participant_index += 1;
395 }
396
397 Ok(participant_index)
398 }
399
400 /// Returns the channel ID for the given room, if it has one.
401 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
402 self.transaction(|tx| async move {
403 let room: Option<room::Model> = room::Entity::find()
404 .filter(room::Column::Id.eq(room_id))
405 .one(&*tx)
406 .await?;
407
408 Ok(room.and_then(|room| room.channel_id))
409 })
410 .await
411 }
412
413 pub(crate) async fn join_channel_room_internal(
414 &self,
415 room_id: RoomId,
416 user_id: UserId,
417 connection: ConnectionId,
418 role: ChannelRole,
419 tx: &DatabaseTransaction,
420 ) -> Result<JoinRoom> {
421 let participant_index = self
422 .get_next_participant_index_internal(room_id, &*tx)
423 .await?;
424
425 room_participant::Entity::insert_many([room_participant::ActiveModel {
426 room_id: ActiveValue::set(room_id),
427 user_id: ActiveValue::set(user_id),
428 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
429 answering_connection_server_id: ActiveValue::set(Some(ServerId(
430 connection.owner_id as i32,
431 ))),
432 answering_connection_lost: ActiveValue::set(false),
433 calling_user_id: ActiveValue::set(user_id),
434 calling_connection_id: ActiveValue::set(connection.id as i32),
435 calling_connection_server_id: ActiveValue::set(Some(ServerId(
436 connection.owner_id as i32,
437 ))),
438 participant_index: ActiveValue::Set(Some(participant_index)),
439 role: ActiveValue::set(Some(role)),
440 id: ActiveValue::NotSet,
441 location_kind: ActiveValue::NotSet,
442 location_project_id: ActiveValue::NotSet,
443 initial_project_id: ActiveValue::NotSet,
444 }])
445 .on_conflict(
446 OnConflict::columns([room_participant::Column::UserId])
447 .update_columns([
448 room_participant::Column::AnsweringConnectionId,
449 room_participant::Column::AnsweringConnectionServerId,
450 room_participant::Column::AnsweringConnectionLost,
451 room_participant::Column::ParticipantIndex,
452 room_participant::Column::Role,
453 ])
454 .to_owned(),
455 )
456 .exec(&*tx)
457 .await?;
458
459 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
460 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
461 let channel_members = self.get_channel_participants(&channel, &*tx).await?;
462 Ok(JoinRoom {
463 room,
464 channel_id: Some(channel.id),
465 channel_members,
466 })
467 }
468
469 pub async fn rejoin_room(
470 &self,
471 rejoin_room: proto::RejoinRoom,
472 user_id: UserId,
473 connection: ConnectionId,
474 ) -> Result<RoomGuard<RejoinedRoom>> {
475 let room_id = RoomId::from_proto(rejoin_room.id);
476 self.room_transaction(room_id, |tx| async {
477 let tx = tx;
478 let participant_update = room_participant::Entity::update_many()
479 .filter(
480 Condition::all()
481 .add(room_participant::Column::RoomId.eq(room_id))
482 .add(room_participant::Column::UserId.eq(user_id))
483 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
484 .add(
485 Condition::any()
486 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
487 .add(
488 room_participant::Column::AnsweringConnectionServerId
489 .ne(connection.owner_id as i32),
490 ),
491 ),
492 )
493 .set(room_participant::ActiveModel {
494 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
495 answering_connection_server_id: ActiveValue::set(Some(ServerId(
496 connection.owner_id as i32,
497 ))),
498 answering_connection_lost: ActiveValue::set(false),
499 ..Default::default()
500 })
501 .exec(&*tx)
502 .await?;
503 if participant_update.rows_affected == 0 {
504 return Err(anyhow!("room does not exist or was already joined"))?;
505 }
506
507 let mut reshared_projects = Vec::new();
508 for reshared_project in &rejoin_room.reshared_projects {
509 let project_id = ProjectId::from_proto(reshared_project.project_id);
510 let project = project::Entity::find_by_id(project_id)
511 .one(&*tx)
512 .await?
513 .ok_or_else(|| anyhow!("project does not exist"))?;
514 if project.host_user_id != user_id {
515 return Err(anyhow!("no such project"))?;
516 }
517
518 let mut collaborators = project
519 .find_related(project_collaborator::Entity)
520 .all(&*tx)
521 .await?;
522 let host_ix = collaborators
523 .iter()
524 .position(|collaborator| {
525 collaborator.user_id == user_id && collaborator.is_host
526 })
527 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
528 let host = collaborators.swap_remove(host_ix);
529 let old_connection_id = host.connection();
530
531 project::Entity::update(project::ActiveModel {
532 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
533 host_connection_server_id: ActiveValue::set(Some(ServerId(
534 connection.owner_id as i32,
535 ))),
536 ..project.into_active_model()
537 })
538 .exec(&*tx)
539 .await?;
540 project_collaborator::Entity::update(project_collaborator::ActiveModel {
541 connection_id: ActiveValue::set(connection.id as i32),
542 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
543 ..host.into_active_model()
544 })
545 .exec(&*tx)
546 .await?;
547
548 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
549 .await?;
550
551 reshared_projects.push(ResharedProject {
552 id: project_id,
553 old_connection_id,
554 collaborators: collaborators
555 .iter()
556 .map(|collaborator| ProjectCollaborator {
557 connection_id: collaborator.connection(),
558 user_id: collaborator.user_id,
559 replica_id: collaborator.replica_id,
560 is_host: collaborator.is_host,
561 })
562 .collect(),
563 worktrees: reshared_project.worktrees.clone(),
564 });
565 }
566
567 project::Entity::delete_many()
568 .filter(
569 Condition::all()
570 .add(project::Column::RoomId.eq(room_id))
571 .add(project::Column::HostUserId.eq(user_id))
572 .add(
573 project::Column::Id
574 .is_not_in(reshared_projects.iter().map(|project| project.id)),
575 ),
576 )
577 .exec(&*tx)
578 .await?;
579
580 let mut rejoined_projects = Vec::new();
581 for rejoined_project in &rejoin_room.rejoined_projects {
582 let project_id = ProjectId::from_proto(rejoined_project.id);
583 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
584 continue;
585 };
586
587 let mut worktrees = Vec::new();
588 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
589 for db_worktree in db_worktrees {
590 let mut worktree = RejoinedWorktree {
591 id: db_worktree.id as u64,
592 abs_path: db_worktree.abs_path,
593 root_name: db_worktree.root_name,
594 visible: db_worktree.visible,
595 updated_entries: Default::default(),
596 removed_entries: Default::default(),
597 updated_repositories: Default::default(),
598 removed_repositories: Default::default(),
599 diagnostic_summaries: Default::default(),
600 settings_files: Default::default(),
601 scan_id: db_worktree.scan_id as u64,
602 completed_scan_id: db_worktree.completed_scan_id as u64,
603 };
604
605 let rejoined_worktree = rejoined_project
606 .worktrees
607 .iter()
608 .find(|worktree| worktree.id == db_worktree.id as u64);
609
610 // File entries
611 {
612 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
613 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
614 } else {
615 worktree_entry::Column::IsDeleted.eq(false)
616 };
617
618 let mut db_entries = worktree_entry::Entity::find()
619 .filter(
620 Condition::all()
621 .add(worktree_entry::Column::ProjectId.eq(project.id))
622 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
623 .add(entry_filter),
624 )
625 .stream(&*tx)
626 .await?;
627
628 while let Some(db_entry) = db_entries.next().await {
629 let db_entry = db_entry?;
630 if db_entry.is_deleted {
631 worktree.removed_entries.push(db_entry.id as u64);
632 } else {
633 worktree.updated_entries.push(proto::Entry {
634 id: db_entry.id as u64,
635 is_dir: db_entry.is_dir,
636 path: db_entry.path,
637 inode: db_entry.inode as u64,
638 mtime: Some(proto::Timestamp {
639 seconds: db_entry.mtime_seconds as u64,
640 nanos: db_entry.mtime_nanos as u32,
641 }),
642 is_symlink: db_entry.is_symlink,
643 is_ignored: db_entry.is_ignored,
644 is_external: db_entry.is_external,
645 git_status: db_entry.git_status.map(|status| status as i32),
646 });
647 }
648 }
649 }
650
651 // Repository Entries
652 {
653 let repository_entry_filter =
654 if let Some(rejoined_worktree) = rejoined_worktree {
655 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
656 } else {
657 worktree_repository::Column::IsDeleted.eq(false)
658 };
659
660 let mut db_repositories = worktree_repository::Entity::find()
661 .filter(
662 Condition::all()
663 .add(worktree_repository::Column::ProjectId.eq(project.id))
664 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
665 .add(repository_entry_filter),
666 )
667 .stream(&*tx)
668 .await?;
669
670 while let Some(db_repository) = db_repositories.next().await {
671 let db_repository = db_repository?;
672 if db_repository.is_deleted {
673 worktree
674 .removed_repositories
675 .push(db_repository.work_directory_id as u64);
676 } else {
677 worktree.updated_repositories.push(proto::RepositoryEntry {
678 work_directory_id: db_repository.work_directory_id as u64,
679 branch: db_repository.branch,
680 });
681 }
682 }
683 }
684
685 worktrees.push(worktree);
686 }
687
688 let language_servers = project
689 .find_related(language_server::Entity)
690 .all(&*tx)
691 .await?
692 .into_iter()
693 .map(|language_server| proto::LanguageServer {
694 id: language_server.id as u64,
695 name: language_server.name,
696 })
697 .collect::<Vec<_>>();
698
699 {
700 let mut db_settings_files = worktree_settings_file::Entity::find()
701 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
702 .stream(&*tx)
703 .await?;
704 while let Some(db_settings_file) = db_settings_files.next().await {
705 let db_settings_file = db_settings_file?;
706 if let Some(worktree) = worktrees
707 .iter_mut()
708 .find(|w| w.id == db_settings_file.worktree_id as u64)
709 {
710 worktree.settings_files.push(WorktreeSettingsFile {
711 path: db_settings_file.path,
712 content: db_settings_file.content,
713 });
714 }
715 }
716 }
717
718 let mut collaborators = project
719 .find_related(project_collaborator::Entity)
720 .all(&*tx)
721 .await?;
722 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
723 .iter()
724 .position(|collaborator| collaborator.user_id == user_id)
725 {
726 collaborators.swap_remove(self_collaborator_ix)
727 } else {
728 continue;
729 };
730 let old_connection_id = self_collaborator.connection();
731 project_collaborator::Entity::update(project_collaborator::ActiveModel {
732 connection_id: ActiveValue::set(connection.id as i32),
733 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
734 ..self_collaborator.into_active_model()
735 })
736 .exec(&*tx)
737 .await?;
738
739 let collaborators = collaborators
740 .into_iter()
741 .map(|collaborator| ProjectCollaborator {
742 connection_id: collaborator.connection(),
743 user_id: collaborator.user_id,
744 replica_id: collaborator.replica_id,
745 is_host: collaborator.is_host,
746 })
747 .collect::<Vec<_>>();
748
749 rejoined_projects.push(RejoinedProject {
750 id: project_id,
751 old_connection_id,
752 collaborators,
753 worktrees,
754 language_servers,
755 });
756 }
757
758 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
759 let channel_members = if let Some(channel) = &channel {
760 self.get_channel_participants(&channel, &tx).await?
761 } else {
762 Vec::new()
763 };
764
765 Ok(RejoinedRoom {
766 room,
767 channel_id: channel.map(|channel| channel.id),
768 channel_members,
769 rejoined_projects,
770 reshared_projects,
771 })
772 })
773 .await
774 }
775
776 pub async fn leave_room(
777 &self,
778 connection: ConnectionId,
779 ) -> Result<Option<RoomGuard<LeftRoom>>> {
780 self.optional_room_transaction(|tx| async move {
781 let leaving_participant = room_participant::Entity::find()
782 .filter(
783 Condition::all()
784 .add(
785 room_participant::Column::AnsweringConnectionId
786 .eq(connection.id as i32),
787 )
788 .add(
789 room_participant::Column::AnsweringConnectionServerId
790 .eq(connection.owner_id as i32),
791 ),
792 )
793 .one(&*tx)
794 .await?;
795
796 if let Some(leaving_participant) = leaving_participant {
797 // Leave room.
798 let room_id = leaving_participant.room_id;
799 room_participant::Entity::delete_by_id(leaving_participant.id)
800 .exec(&*tx)
801 .await?;
802
803 // Cancel pending calls initiated by the leaving user.
804 let called_participants = room_participant::Entity::find()
805 .filter(
806 Condition::all()
807 .add(
808 room_participant::Column::CallingUserId
809 .eq(leaving_participant.user_id),
810 )
811 .add(room_participant::Column::AnsweringConnectionId.is_null()),
812 )
813 .all(&*tx)
814 .await?;
815 room_participant::Entity::delete_many()
816 .filter(
817 room_participant::Column::Id
818 .is_in(called_participants.iter().map(|participant| participant.id)),
819 )
820 .exec(&*tx)
821 .await?;
822 let canceled_calls_to_user_ids = called_participants
823 .into_iter()
824 .map(|participant| participant.user_id)
825 .collect();
826
827 // Detect left projects.
828 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
829 enum QueryProjectIds {
830 ProjectId,
831 }
832 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
833 .select_only()
834 .column_as(
835 project_collaborator::Column::ProjectId,
836 QueryProjectIds::ProjectId,
837 )
838 .filter(
839 Condition::all()
840 .add(
841 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
842 )
843 .add(
844 project_collaborator::Column::ConnectionServerId
845 .eq(connection.owner_id as i32),
846 ),
847 )
848 .into_values::<_, QueryProjectIds>()
849 .all(&*tx)
850 .await?;
851 let mut left_projects = HashMap::default();
852 let mut collaborators = project_collaborator::Entity::find()
853 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
854 .stream(&*tx)
855 .await?;
856 while let Some(collaborator) = collaborators.next().await {
857 let collaborator = collaborator?;
858 let left_project =
859 left_projects
860 .entry(collaborator.project_id)
861 .or_insert(LeftProject {
862 id: collaborator.project_id,
863 host_user_id: Default::default(),
864 connection_ids: Default::default(),
865 host_connection_id: Default::default(),
866 });
867
868 let collaborator_connection_id = collaborator.connection();
869 if collaborator_connection_id != connection {
870 left_project.connection_ids.push(collaborator_connection_id);
871 }
872
873 if collaborator.is_host {
874 left_project.host_user_id = collaborator.user_id;
875 left_project.host_connection_id = collaborator_connection_id;
876 }
877 }
878 drop(collaborators);
879
880 // Leave projects.
881 project_collaborator::Entity::delete_many()
882 .filter(
883 Condition::all()
884 .add(
885 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
886 )
887 .add(
888 project_collaborator::Column::ConnectionServerId
889 .eq(connection.owner_id as i32),
890 ),
891 )
892 .exec(&*tx)
893 .await?;
894
895 follower::Entity::delete_many()
896 .filter(
897 Condition::all()
898 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
899 )
900 .exec(&*tx)
901 .await?;
902
903 // Unshare projects.
904 project::Entity::delete_many()
905 .filter(
906 Condition::all()
907 .add(project::Column::RoomId.eq(room_id))
908 .add(project::Column::HostConnectionId.eq(connection.id as i32))
909 .add(
910 project::Column::HostConnectionServerId
911 .eq(connection.owner_id as i32),
912 ),
913 )
914 .exec(&*tx)
915 .await?;
916
917 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
918 let deleted = if room.participants.is_empty() {
919 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
920 result.rows_affected > 0
921 } else {
922 false
923 };
924
925 let channel_members = if let Some(channel) = &channel {
926 self.get_channel_participants(channel, &tx).await?
927 } else {
928 Vec::new()
929 };
930 let left_room = LeftRoom {
931 room,
932 channel_id: channel.map(|channel| channel.id),
933 channel_members,
934 left_projects,
935 canceled_calls_to_user_ids,
936 deleted,
937 };
938
939 if left_room.room.participants.is_empty() {
940 self.rooms.remove(&room_id);
941 }
942
943 Ok(Some((room_id, left_room)))
944 } else {
945 Ok(None)
946 }
947 })
948 .await
949 }
950
951 /// Updates the location of a participant in the given room.
952 pub async fn update_room_participant_location(
953 &self,
954 room_id: RoomId,
955 connection: ConnectionId,
956 location: proto::ParticipantLocation,
957 ) -> Result<RoomGuard<proto::Room>> {
958 self.room_transaction(room_id, |tx| async {
959 let tx = tx;
960 let location_kind;
961 let location_project_id;
962 match location
963 .variant
964 .as_ref()
965 .ok_or_else(|| anyhow!("invalid location"))?
966 {
967 proto::participant_location::Variant::SharedProject(project) => {
968 location_kind = 0;
969 location_project_id = Some(ProjectId::from_proto(project.id));
970 }
971 proto::participant_location::Variant::UnsharedProject(_) => {
972 location_kind = 1;
973 location_project_id = None;
974 }
975 proto::participant_location::Variant::External(_) => {
976 location_kind = 2;
977 location_project_id = None;
978 }
979 }
980
981 let result = room_participant::Entity::update_many()
982 .filter(
983 Condition::all()
984 .add(room_participant::Column::RoomId.eq(room_id))
985 .add(
986 room_participant::Column::AnsweringConnectionId
987 .eq(connection.id as i32),
988 )
989 .add(
990 room_participant::Column::AnsweringConnectionServerId
991 .eq(connection.owner_id as i32),
992 ),
993 )
994 .set(room_participant::ActiveModel {
995 location_kind: ActiveValue::set(Some(location_kind)),
996 location_project_id: ActiveValue::set(location_project_id),
997 ..Default::default()
998 })
999 .exec(&*tx)
1000 .await?;
1001
1002 if result.rows_affected == 1 {
1003 let room = self.get_room(room_id, &tx).await?;
1004 Ok(room)
1005 } else {
1006 Err(anyhow!("could not update room participant location"))?
1007 }
1008 })
1009 .await
1010 }
1011
1012 /// Sets the role of a participant in the given room.
1013 pub async fn set_room_participant_role(
1014 &self,
1015 admin_id: UserId,
1016 room_id: RoomId,
1017 user_id: UserId,
1018 role: ChannelRole,
1019 ) -> Result<RoomGuard<proto::Room>> {
1020 self.room_transaction(room_id, |tx| async move {
1021 room_participant::Entity::find()
1022 .filter(
1023 Condition::all()
1024 .add(room_participant::Column::RoomId.eq(room_id))
1025 .add(room_participant::Column::UserId.eq(admin_id))
1026 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1027 )
1028 .one(&*tx)
1029 .await?
1030 .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1031
1032 let result = room_participant::Entity::update_many()
1033 .filter(
1034 Condition::all()
1035 .add(room_participant::Column::RoomId.eq(room_id))
1036 .add(room_participant::Column::UserId.eq(user_id)),
1037 )
1038 .set(room_participant::ActiveModel {
1039 role: ActiveValue::set(Some(ChannelRole::from(role))),
1040 ..Default::default()
1041 })
1042 .exec(&*tx)
1043 .await?;
1044
1045 if result.rows_affected != 1 {
1046 Err(anyhow!("could not update room participant role"))?;
1047 }
1048 Ok(self.get_room(room_id, &tx).await?)
1049 })
1050 .await
1051 }
1052
1053 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1054 self.transaction(|tx| async move {
1055 self.room_connection_lost(connection, &*tx).await?;
1056 self.channel_buffer_connection_lost(connection, &*tx)
1057 .await?;
1058 self.channel_chat_connection_lost(connection, &*tx).await?;
1059 Ok(())
1060 })
1061 .await
1062 }
1063
1064 pub async fn room_connection_lost(
1065 &self,
1066 connection: ConnectionId,
1067 tx: &DatabaseTransaction,
1068 ) -> Result<()> {
1069 let participant = room_participant::Entity::find()
1070 .filter(
1071 Condition::all()
1072 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1073 .add(
1074 room_participant::Column::AnsweringConnectionServerId
1075 .eq(connection.owner_id as i32),
1076 ),
1077 )
1078 .one(&*tx)
1079 .await?;
1080
1081 if let Some(participant) = participant {
1082 room_participant::Entity::update(room_participant::ActiveModel {
1083 answering_connection_lost: ActiveValue::set(true),
1084 ..participant.into_active_model()
1085 })
1086 .exec(&*tx)
1087 .await?;
1088 }
1089 Ok(())
1090 }
1091
1092 fn build_incoming_call(
1093 room: &proto::Room,
1094 called_user_id: UserId,
1095 ) -> Option<proto::IncomingCall> {
1096 let pending_participant = room
1097 .pending_participants
1098 .iter()
1099 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1100
1101 Some(proto::IncomingCall {
1102 room_id: room.id,
1103 calling_user_id: pending_participant.calling_user_id,
1104 participant_user_ids: room
1105 .participants
1106 .iter()
1107 .map(|participant| participant.user_id)
1108 .collect(),
1109 initial_project: room.participants.iter().find_map(|participant| {
1110 let initial_project_id = pending_participant.initial_project_id?;
1111 participant
1112 .projects
1113 .iter()
1114 .find(|project| project.id == initial_project_id)
1115 .cloned()
1116 }),
1117 })
1118 }
1119
1120 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1121 let (_, room) = self.get_channel_room(room_id, tx).await?;
1122 Ok(room)
1123 }
1124
1125 pub async fn room_connection_ids(
1126 &self,
1127 room_id: RoomId,
1128 connection_id: ConnectionId,
1129 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1130 self.room_transaction(room_id, |tx| async move {
1131 let mut participants = room_participant::Entity::find()
1132 .filter(room_participant::Column::RoomId.eq(room_id))
1133 .stream(&*tx)
1134 .await?;
1135
1136 let mut is_participant = false;
1137 let mut connection_ids = HashSet::default();
1138 while let Some(participant) = participants.next().await {
1139 let participant = participant?;
1140 if let Some(answering_connection) = participant.answering_connection() {
1141 if answering_connection == connection_id {
1142 is_participant = true;
1143 } else {
1144 connection_ids.insert(answering_connection);
1145 }
1146 }
1147 }
1148
1149 if !is_participant {
1150 Err(anyhow!("not a room participant"))?;
1151 }
1152
1153 Ok(connection_ids)
1154 })
1155 .await
1156 }
1157
1158 async fn get_channel_room(
1159 &self,
1160 room_id: RoomId,
1161 tx: &DatabaseTransaction,
1162 ) -> Result<(Option<channel::Model>, proto::Room)> {
1163 let db_room = room::Entity::find_by_id(room_id)
1164 .one(tx)
1165 .await?
1166 .ok_or_else(|| anyhow!("could not find room"))?;
1167
1168 let mut db_participants = db_room
1169 .find_related(room_participant::Entity)
1170 .stream(tx)
1171 .await?;
1172 let mut participants = HashMap::default();
1173 let mut pending_participants = Vec::new();
1174 while let Some(db_participant) = db_participants.next().await {
1175 let db_participant = db_participant?;
1176 if let (
1177 Some(answering_connection_id),
1178 Some(answering_connection_server_id),
1179 Some(participant_index),
1180 ) = (
1181 db_participant.answering_connection_id,
1182 db_participant.answering_connection_server_id,
1183 db_participant.participant_index,
1184 ) {
1185 let location = match (
1186 db_participant.location_kind,
1187 db_participant.location_project_id,
1188 ) {
1189 (Some(0), Some(project_id)) => {
1190 Some(proto::participant_location::Variant::SharedProject(
1191 proto::participant_location::SharedProject {
1192 id: project_id.to_proto(),
1193 },
1194 ))
1195 }
1196 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1197 Default::default(),
1198 )),
1199 _ => Some(proto::participant_location::Variant::External(
1200 Default::default(),
1201 )),
1202 };
1203
1204 let answering_connection = ConnectionId {
1205 owner_id: answering_connection_server_id.0 as u32,
1206 id: answering_connection_id as u32,
1207 };
1208 participants.insert(
1209 answering_connection,
1210 proto::Participant {
1211 user_id: db_participant.user_id.to_proto(),
1212 peer_id: Some(answering_connection.into()),
1213 projects: Default::default(),
1214 location: Some(proto::ParticipantLocation { variant: location }),
1215 participant_index: participant_index as u32,
1216 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1217 },
1218 );
1219 } else {
1220 pending_participants.push(proto::PendingParticipant {
1221 user_id: db_participant.user_id.to_proto(),
1222 calling_user_id: db_participant.calling_user_id.to_proto(),
1223 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1224 });
1225 }
1226 }
1227 drop(db_participants);
1228
1229 let mut db_projects = db_room
1230 .find_related(project::Entity)
1231 .find_with_related(worktree::Entity)
1232 .stream(tx)
1233 .await?;
1234
1235 while let Some(row) = db_projects.next().await {
1236 let (db_project, db_worktree) = row?;
1237 let host_connection = db_project.host_connection()?;
1238 if let Some(participant) = participants.get_mut(&host_connection) {
1239 let project = if let Some(project) = participant
1240 .projects
1241 .iter_mut()
1242 .find(|project| project.id == db_project.id.to_proto())
1243 {
1244 project
1245 } else {
1246 participant.projects.push(proto::ParticipantProject {
1247 id: db_project.id.to_proto(),
1248 worktree_root_names: Default::default(),
1249 });
1250 participant.projects.last_mut().unwrap()
1251 };
1252
1253 if let Some(db_worktree) = db_worktree {
1254 if db_worktree.visible {
1255 project.worktree_root_names.push(db_worktree.root_name);
1256 }
1257 }
1258 }
1259 }
1260 drop(db_projects);
1261
1262 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1263 let mut followers = Vec::new();
1264 while let Some(db_follower) = db_followers.next().await {
1265 let db_follower = db_follower?;
1266 followers.push(proto::Follower {
1267 leader_id: Some(db_follower.leader_connection().into()),
1268 follower_id: Some(db_follower.follower_connection().into()),
1269 project_id: db_follower.project_id.to_proto(),
1270 });
1271 }
1272 drop(db_followers);
1273
1274 let channel = if let Some(channel_id) = db_room.channel_id {
1275 Some(self.get_channel_internal(channel_id, &*tx).await?)
1276 } else {
1277 None
1278 };
1279
1280 Ok((
1281 channel,
1282 proto::Room {
1283 id: db_room.id.to_proto(),
1284 live_kit_room: db_room.live_kit_room,
1285 participants: participants.into_values().collect(),
1286 pending_participants,
1287 followers,
1288 },
1289 ))
1290 }
1291}