1use super::*;
2
3impl Database {
4 /// Clears all room participants in rooms attached to a stale server.
5 pub async fn clear_stale_room_participants(
6 &self,
7 room_id: RoomId,
8 new_server_id: ServerId,
9 ) -> Result<RoomGuard<RefreshedRoom>> {
10 self.room_transaction(room_id, |tx| async move {
11 let stale_participant_filter = Condition::all()
12 .add(room_participant::Column::RoomId.eq(room_id))
13 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
14 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
15
16 let stale_participant_user_ids = room_participant::Entity::find()
17 .filter(stale_participant_filter.clone())
18 .all(&*tx)
19 .await?
20 .into_iter()
21 .map(|participant| participant.user_id)
22 .collect::<Vec<_>>();
23
24 // Delete participants who failed to reconnect and cancel their calls.
25 let mut canceled_calls_to_user_ids = Vec::new();
26 room_participant::Entity::delete_many()
27 .filter(stale_participant_filter)
28 .exec(&*tx)
29 .await?;
30 let called_participants = room_participant::Entity::find()
31 .filter(
32 Condition::all()
33 .add(
34 room_participant::Column::CallingUserId
35 .is_in(stale_participant_user_ids.iter().copied()),
36 )
37 .add(room_participant::Column::AnsweringConnectionId.is_null()),
38 )
39 .all(&*tx)
40 .await?;
41 room_participant::Entity::delete_many()
42 .filter(
43 room_participant::Column::Id
44 .is_in(called_participants.iter().map(|participant| participant.id)),
45 )
46 .exec(&*tx)
47 .await?;
48 canceled_calls_to_user_ids.extend(
49 called_participants
50 .into_iter()
51 .map(|participant| participant.user_id),
52 );
53
54 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
55 if channel.is_none() {
56 // Delete the room if it becomes empty.
57 if room.participants.is_empty() {
58 project::Entity::delete_many()
59 .filter(project::Column::RoomId.eq(room_id))
60 .exec(&*tx)
61 .await?;
62 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
63 }
64 };
65
66 Ok(RefreshedRoom {
67 room,
68 channel,
69 stale_participant_user_ids,
70 canceled_calls_to_user_ids,
71 })
72 })
73 .await
74 }
75
76 /// Returns the incoming calls for user with the given ID.
77 pub async fn incoming_call_for_user(
78 &self,
79 user_id: UserId,
80 ) -> Result<Option<proto::IncomingCall>> {
81 self.transaction(|tx| async move {
82 let pending_participant = room_participant::Entity::find()
83 .filter(
84 room_participant::Column::UserId
85 .eq(user_id)
86 .and(room_participant::Column::AnsweringConnectionId.is_null()),
87 )
88 .one(&*tx)
89 .await?;
90
91 if let Some(pending_participant) = pending_participant {
92 let room = self.get_room(pending_participant.room_id, &tx).await?;
93 Ok(Self::build_incoming_call(&room, user_id))
94 } else {
95 Ok(None)
96 }
97 })
98 .await
99 }
100
101 /// Creates a new room.
102 pub async fn create_room(
103 &self,
104 user_id: UserId,
105 connection: ConnectionId,
106 live_kit_room: &str,
107 ) -> Result<proto::Room> {
108 self.transaction(|tx| async move {
109 let room = room::ActiveModel {
110 live_kit_room: ActiveValue::set(live_kit_room.into()),
111 ..Default::default()
112 }
113 .insert(&*tx)
114 .await?;
115 room_participant::ActiveModel {
116 room_id: ActiveValue::set(room.id),
117 user_id: ActiveValue::set(user_id),
118 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
119 answering_connection_server_id: ActiveValue::set(Some(ServerId(
120 connection.owner_id as i32,
121 ))),
122 answering_connection_lost: ActiveValue::set(false),
123 calling_user_id: ActiveValue::set(user_id),
124 calling_connection_id: ActiveValue::set(connection.id as i32),
125 calling_connection_server_id: ActiveValue::set(Some(ServerId(
126 connection.owner_id as i32,
127 ))),
128 participant_index: ActiveValue::set(Some(0)),
129 role: ActiveValue::set(Some(ChannelRole::Admin)),
130
131 id: ActiveValue::NotSet,
132 location_kind: ActiveValue::NotSet,
133 location_project_id: ActiveValue::NotSet,
134 initial_project_id: ActiveValue::NotSet,
135 }
136 .insert(&*tx)
137 .await?;
138
139 let room = self.get_room(room.id, &tx).await?;
140 Ok(room)
141 })
142 .await
143 }
144
145 pub async fn call(
146 &self,
147 room_id: RoomId,
148 calling_user_id: UserId,
149 calling_connection: ConnectionId,
150 called_user_id: UserId,
151 initial_project_id: Option<ProjectId>,
152 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
153 self.room_transaction(room_id, |tx| async move {
154 let caller = room_participant::Entity::find()
155 .filter(
156 room_participant::Column::UserId
157 .eq(calling_user_id)
158 .and(room_participant::Column::RoomId.eq(room_id)),
159 )
160 .one(&*tx)
161 .await?
162 .ok_or_else(|| anyhow!("user is not in the room"))?;
163
164 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
165 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
166 ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
167 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
168 };
169
170 room_participant::ActiveModel {
171 room_id: ActiveValue::set(room_id),
172 user_id: ActiveValue::set(called_user_id),
173 answering_connection_lost: ActiveValue::set(false),
174 participant_index: ActiveValue::NotSet,
175 calling_user_id: ActiveValue::set(calling_user_id),
176 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
177 calling_connection_server_id: ActiveValue::set(Some(ServerId(
178 calling_connection.owner_id as i32,
179 ))),
180 initial_project_id: ActiveValue::set(initial_project_id),
181 role: ActiveValue::set(Some(called_user_role)),
182
183 id: ActiveValue::NotSet,
184 answering_connection_id: ActiveValue::NotSet,
185 answering_connection_server_id: ActiveValue::NotSet,
186 location_kind: ActiveValue::NotSet,
187 location_project_id: ActiveValue::NotSet,
188 }
189 .insert(&*tx)
190 .await?;
191
192 let room = self.get_room(room_id, &tx).await?;
193 let incoming_call = Self::build_incoming_call(&room, called_user_id)
194 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
195 Ok((room, incoming_call))
196 })
197 .await
198 }
199
200 pub async fn call_failed(
201 &self,
202 room_id: RoomId,
203 called_user_id: UserId,
204 ) -> Result<RoomGuard<proto::Room>> {
205 self.room_transaction(room_id, |tx| async move {
206 room_participant::Entity::delete_many()
207 .filter(
208 room_participant::Column::RoomId
209 .eq(room_id)
210 .and(room_participant::Column::UserId.eq(called_user_id)),
211 )
212 .exec(&*tx)
213 .await?;
214 let room = self.get_room(room_id, &tx).await?;
215 Ok(room)
216 })
217 .await
218 }
219
220 pub async fn decline_call(
221 &self,
222 expected_room_id: Option<RoomId>,
223 user_id: UserId,
224 ) -> Result<Option<RoomGuard<proto::Room>>> {
225 self.optional_room_transaction(|tx| async move {
226 let mut filter = Condition::all()
227 .add(room_participant::Column::UserId.eq(user_id))
228 .add(room_participant::Column::AnsweringConnectionId.is_null());
229 if let Some(room_id) = expected_room_id {
230 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
231 }
232 let participant = room_participant::Entity::find()
233 .filter(filter)
234 .one(&*tx)
235 .await?;
236
237 let participant = if let Some(participant) = participant {
238 participant
239 } else if expected_room_id.is_some() {
240 return Err(anyhow!("could not find call to decline"))?;
241 } else {
242 return Ok(None);
243 };
244
245 let room_id = participant.room_id;
246 room_participant::Entity::delete(participant.into_active_model())
247 .exec(&*tx)
248 .await?;
249
250 let room = self.get_room(room_id, &tx).await?;
251 Ok(Some((room_id, room)))
252 })
253 .await
254 }
255
256 pub async fn cancel_call(
257 &self,
258 room_id: RoomId,
259 calling_connection: ConnectionId,
260 called_user_id: UserId,
261 ) -> Result<RoomGuard<proto::Room>> {
262 self.room_transaction(room_id, |tx| async move {
263 let participant = room_participant::Entity::find()
264 .filter(
265 Condition::all()
266 .add(room_participant::Column::UserId.eq(called_user_id))
267 .add(room_participant::Column::RoomId.eq(room_id))
268 .add(
269 room_participant::Column::CallingConnectionId
270 .eq(calling_connection.id as i32),
271 )
272 .add(
273 room_participant::Column::CallingConnectionServerId
274 .eq(calling_connection.owner_id as i32),
275 )
276 .add(room_participant::Column::AnsweringConnectionId.is_null()),
277 )
278 .one(&*tx)
279 .await?
280 .ok_or_else(|| anyhow!("no call to cancel"))?;
281
282 room_participant::Entity::delete(participant.into_active_model())
283 .exec(&*tx)
284 .await?;
285
286 let room = self.get_room(room_id, &tx).await?;
287 Ok(room)
288 })
289 .await
290 }
291
292 pub async fn join_room(
293 &self,
294 room_id: RoomId,
295 user_id: UserId,
296 connection: ConnectionId,
297 ) -> Result<RoomGuard<JoinRoom>> {
298 self.room_transaction(room_id, |tx| async move {
299 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
300 enum QueryChannelId {
301 ChannelId,
302 }
303
304 let channel_id: Option<ChannelId> = room::Entity::find()
305 .select_only()
306 .column(room::Column::ChannelId)
307 .filter(room::Column::Id.eq(room_id))
308 .into_values::<_, QueryChannelId>()
309 .one(&*tx)
310 .await?
311 .ok_or_else(|| anyhow!("no such room"))?;
312
313 if channel_id.is_some() {
314 Err(anyhow!("tried to join channel call directly"))?
315 }
316
317 let participant_index = self
318 .get_next_participant_index_internal(room_id, &tx)
319 .await?;
320
321 let result = room_participant::Entity::update_many()
322 .filter(
323 Condition::all()
324 .add(room_participant::Column::RoomId.eq(room_id))
325 .add(room_participant::Column::UserId.eq(user_id))
326 .add(room_participant::Column::AnsweringConnectionId.is_null()),
327 )
328 .set(room_participant::ActiveModel {
329 participant_index: ActiveValue::Set(Some(participant_index)),
330 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
331 answering_connection_server_id: ActiveValue::set(Some(ServerId(
332 connection.owner_id as i32,
333 ))),
334 answering_connection_lost: ActiveValue::set(false),
335 ..Default::default()
336 })
337 .exec(&*tx)
338 .await?;
339 if result.rows_affected == 0 {
340 Err(anyhow!("room does not exist or was already joined"))?;
341 }
342
343 let room = self.get_room(room_id, &tx).await?;
344 Ok(JoinRoom {
345 room,
346 channel: None,
347 })
348 })
349 .await
350 }
351
352 async fn get_next_participant_index_internal(
353 &self,
354 room_id: RoomId,
355 tx: &DatabaseTransaction,
356 ) -> Result<i32> {
357 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
358 enum QueryParticipantIndices {
359 ParticipantIndex,
360 }
361 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
362 .filter(
363 room_participant::Column::RoomId
364 .eq(room_id)
365 .and(room_participant::Column::ParticipantIndex.is_not_null()),
366 )
367 .select_only()
368 .column(room_participant::Column::ParticipantIndex)
369 .into_values::<_, QueryParticipantIndices>()
370 .all(tx)
371 .await?;
372
373 let mut participant_index = 0;
374 while existing_participant_indices.contains(&participant_index) {
375 participant_index += 1;
376 }
377
378 Ok(participant_index)
379 }
380
381 /// Returns the channel ID for the given room, if it has one.
382 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
383 self.transaction(|tx| async move {
384 let room: Option<room::Model> = room::Entity::find()
385 .filter(room::Column::Id.eq(room_id))
386 .one(&*tx)
387 .await?;
388
389 Ok(room.and_then(|room| room.channel_id))
390 })
391 .await
392 }
393
394 pub(crate) async fn join_channel_room_internal(
395 &self,
396 room_id: RoomId,
397 user_id: UserId,
398 connection: ConnectionId,
399 role: ChannelRole,
400 tx: &DatabaseTransaction,
401 ) -> Result<JoinRoom> {
402 let participant_index = self
403 .get_next_participant_index_internal(room_id, tx)
404 .await?;
405
406 room_participant::Entity::insert_many([room_participant::ActiveModel {
407 room_id: ActiveValue::set(room_id),
408 user_id: ActiveValue::set(user_id),
409 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
410 answering_connection_server_id: ActiveValue::set(Some(ServerId(
411 connection.owner_id as i32,
412 ))),
413 answering_connection_lost: ActiveValue::set(false),
414 calling_user_id: ActiveValue::set(user_id),
415 calling_connection_id: ActiveValue::set(connection.id as i32),
416 calling_connection_server_id: ActiveValue::set(Some(ServerId(
417 connection.owner_id as i32,
418 ))),
419 participant_index: ActiveValue::Set(Some(participant_index)),
420 role: ActiveValue::set(Some(role)),
421 id: ActiveValue::NotSet,
422 location_kind: ActiveValue::NotSet,
423 location_project_id: ActiveValue::NotSet,
424 initial_project_id: ActiveValue::NotSet,
425 }])
426 .on_conflict(
427 OnConflict::columns([room_participant::Column::UserId])
428 .update_columns([
429 room_participant::Column::AnsweringConnectionId,
430 room_participant::Column::AnsweringConnectionServerId,
431 room_participant::Column::AnsweringConnectionLost,
432 room_participant::Column::ParticipantIndex,
433 room_participant::Column::Role,
434 ])
435 .to_owned(),
436 )
437 .exec(tx)
438 .await?;
439
440 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
441 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
442 Ok(JoinRoom {
443 room,
444 channel: Some(channel),
445 })
446 }
447
448 pub async fn rejoin_room(
449 &self,
450 rejoin_room: proto::RejoinRoom,
451 user_id: UserId,
452 connection: ConnectionId,
453 ) -> Result<RoomGuard<RejoinedRoom>> {
454 let room_id = RoomId::from_proto(rejoin_room.id);
455 self.room_transaction(room_id, |tx| async {
456 let tx = tx;
457 let participant_update = room_participant::Entity::update_many()
458 .filter(
459 Condition::all()
460 .add(room_participant::Column::RoomId.eq(room_id))
461 .add(room_participant::Column::UserId.eq(user_id))
462 .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
463 )
464 .set(room_participant::ActiveModel {
465 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
466 answering_connection_server_id: ActiveValue::set(Some(ServerId(
467 connection.owner_id as i32,
468 ))),
469 answering_connection_lost: ActiveValue::set(false),
470 ..Default::default()
471 })
472 .exec(&*tx)
473 .await?;
474 if participant_update.rows_affected == 0 {
475 return Err(anyhow!("room does not exist or was already joined"))?;
476 }
477
478 let mut reshared_projects = Vec::new();
479 for reshared_project in &rejoin_room.reshared_projects {
480 let project_id = ProjectId::from_proto(reshared_project.project_id);
481 let project = project::Entity::find_by_id(project_id)
482 .one(&*tx)
483 .await?
484 .ok_or_else(|| anyhow!("project does not exist"))?;
485 if project.host_user_id != Some(user_id) {
486 return Err(anyhow!("no such project"))?;
487 }
488
489 let mut collaborators = project
490 .find_related(project_collaborator::Entity)
491 .all(&*tx)
492 .await?;
493 let host_ix = collaborators
494 .iter()
495 .position(|collaborator| {
496 collaborator.user_id == user_id && collaborator.is_host
497 })
498 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
499 let host = collaborators.swap_remove(host_ix);
500 let old_connection_id = host.connection();
501
502 project::Entity::update(project::ActiveModel {
503 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
504 host_connection_server_id: ActiveValue::set(Some(ServerId(
505 connection.owner_id as i32,
506 ))),
507 ..project.into_active_model()
508 })
509 .exec(&*tx)
510 .await?;
511 project_collaborator::Entity::update(project_collaborator::ActiveModel {
512 connection_id: ActiveValue::set(connection.id as i32),
513 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
514 ..host.into_active_model()
515 })
516 .exec(&*tx)
517 .await?;
518
519 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
520 .await?;
521
522 reshared_projects.push(ResharedProject {
523 id: project_id,
524 old_connection_id,
525 collaborators: collaborators
526 .iter()
527 .map(|collaborator| ProjectCollaborator {
528 connection_id: collaborator.connection(),
529 user_id: collaborator.user_id,
530 replica_id: collaborator.replica_id,
531 is_host: collaborator.is_host,
532 })
533 .collect(),
534 worktrees: reshared_project.worktrees.clone(),
535 });
536 }
537
538 project::Entity::delete_many()
539 .filter(
540 Condition::all()
541 .add(project::Column::RoomId.eq(room_id))
542 .add(project::Column::HostUserId.eq(user_id))
543 .add(
544 project::Column::Id
545 .is_not_in(reshared_projects.iter().map(|project| project.id)),
546 ),
547 )
548 .exec(&*tx)
549 .await?;
550
551 let mut rejoined_projects = Vec::new();
552 for rejoined_project in &rejoin_room.rejoined_projects {
553 let project_id = ProjectId::from_proto(rejoined_project.id);
554 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
555 continue;
556 };
557
558 let mut worktrees = Vec::new();
559 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
560 for db_worktree in db_worktrees {
561 let mut worktree = RejoinedWorktree {
562 id: db_worktree.id as u64,
563 abs_path: db_worktree.abs_path,
564 root_name: db_worktree.root_name,
565 visible: db_worktree.visible,
566 updated_entries: Default::default(),
567 removed_entries: Default::default(),
568 updated_repositories: Default::default(),
569 removed_repositories: Default::default(),
570 diagnostic_summaries: Default::default(),
571 settings_files: Default::default(),
572 scan_id: db_worktree.scan_id as u64,
573 completed_scan_id: db_worktree.completed_scan_id as u64,
574 };
575
576 let rejoined_worktree = rejoined_project
577 .worktrees
578 .iter()
579 .find(|worktree| worktree.id == db_worktree.id as u64);
580
581 // File entries
582 {
583 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
584 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
585 } else {
586 worktree_entry::Column::IsDeleted.eq(false)
587 };
588
589 let mut db_entries = worktree_entry::Entity::find()
590 .filter(
591 Condition::all()
592 .add(worktree_entry::Column::ProjectId.eq(project.id))
593 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
594 .add(entry_filter),
595 )
596 .stream(&*tx)
597 .await?;
598
599 while let Some(db_entry) = db_entries.next().await {
600 let db_entry = db_entry?;
601 if db_entry.is_deleted {
602 worktree.removed_entries.push(db_entry.id as u64);
603 } else {
604 worktree.updated_entries.push(proto::Entry {
605 id: db_entry.id as u64,
606 is_dir: db_entry.is_dir,
607 path: db_entry.path,
608 inode: db_entry.inode as u64,
609 mtime: Some(proto::Timestamp {
610 seconds: db_entry.mtime_seconds as u64,
611 nanos: db_entry.mtime_nanos as u32,
612 }),
613 is_symlink: db_entry.is_symlink,
614 is_ignored: db_entry.is_ignored,
615 is_external: db_entry.is_external,
616 git_status: db_entry.git_status.map(|status| status as i32),
617 });
618 }
619 }
620 }
621
622 // Repository Entries
623 {
624 let repository_entry_filter =
625 if let Some(rejoined_worktree) = rejoined_worktree {
626 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
627 } else {
628 worktree_repository::Column::IsDeleted.eq(false)
629 };
630
631 let mut db_repositories = worktree_repository::Entity::find()
632 .filter(
633 Condition::all()
634 .add(worktree_repository::Column::ProjectId.eq(project.id))
635 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
636 .add(repository_entry_filter),
637 )
638 .stream(&*tx)
639 .await?;
640
641 while let Some(db_repository) = db_repositories.next().await {
642 let db_repository = db_repository?;
643 if db_repository.is_deleted {
644 worktree
645 .removed_repositories
646 .push(db_repository.work_directory_id as u64);
647 } else {
648 worktree.updated_repositories.push(proto::RepositoryEntry {
649 work_directory_id: db_repository.work_directory_id as u64,
650 branch: db_repository.branch,
651 });
652 }
653 }
654 }
655
656 worktrees.push(worktree);
657 }
658
659 let language_servers = project
660 .find_related(language_server::Entity)
661 .all(&*tx)
662 .await?
663 .into_iter()
664 .map(|language_server| proto::LanguageServer {
665 id: language_server.id as u64,
666 name: language_server.name,
667 })
668 .collect::<Vec<_>>();
669
670 {
671 let mut db_settings_files = worktree_settings_file::Entity::find()
672 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
673 .stream(&*tx)
674 .await?;
675 while let Some(db_settings_file) = db_settings_files.next().await {
676 let db_settings_file = db_settings_file?;
677 if let Some(worktree) = worktrees
678 .iter_mut()
679 .find(|w| w.id == db_settings_file.worktree_id as u64)
680 {
681 worktree.settings_files.push(WorktreeSettingsFile {
682 path: db_settings_file.path,
683 content: db_settings_file.content,
684 });
685 }
686 }
687 }
688
689 let mut collaborators = project
690 .find_related(project_collaborator::Entity)
691 .all(&*tx)
692 .await?;
693 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
694 .iter()
695 .position(|collaborator| collaborator.user_id == user_id)
696 {
697 collaborators.swap_remove(self_collaborator_ix)
698 } else {
699 continue;
700 };
701 let old_connection_id = self_collaborator.connection();
702 project_collaborator::Entity::update(project_collaborator::ActiveModel {
703 connection_id: ActiveValue::set(connection.id as i32),
704 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
705 ..self_collaborator.into_active_model()
706 })
707 .exec(&*tx)
708 .await?;
709
710 let collaborators = collaborators
711 .into_iter()
712 .map(|collaborator| ProjectCollaborator {
713 connection_id: collaborator.connection(),
714 user_id: collaborator.user_id,
715 replica_id: collaborator.replica_id,
716 is_host: collaborator.is_host,
717 })
718 .collect::<Vec<_>>();
719
720 rejoined_projects.push(RejoinedProject {
721 id: project_id,
722 old_connection_id,
723 collaborators,
724 worktrees,
725 language_servers,
726 });
727 }
728
729 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
730
731 Ok(RejoinedRoom {
732 room,
733 channel,
734 rejoined_projects,
735 reshared_projects,
736 })
737 })
738 .await
739 }
740
741 pub async fn leave_room(
742 &self,
743 connection: ConnectionId,
744 ) -> Result<Option<RoomGuard<LeftRoom>>> {
745 self.optional_room_transaction(|tx| async move {
746 let leaving_participant = room_participant::Entity::find()
747 .filter(
748 Condition::all()
749 .add(
750 room_participant::Column::AnsweringConnectionId
751 .eq(connection.id as i32),
752 )
753 .add(
754 room_participant::Column::AnsweringConnectionServerId
755 .eq(connection.owner_id as i32),
756 ),
757 )
758 .one(&*tx)
759 .await?;
760
761 if let Some(leaving_participant) = leaving_participant {
762 // Leave room.
763 let room_id = leaving_participant.room_id;
764 room_participant::Entity::delete_by_id(leaving_participant.id)
765 .exec(&*tx)
766 .await?;
767
768 // Cancel pending calls initiated by the leaving user.
769 let called_participants = room_participant::Entity::find()
770 .filter(
771 Condition::all()
772 .add(
773 room_participant::Column::CallingUserId
774 .eq(leaving_participant.user_id),
775 )
776 .add(room_participant::Column::AnsweringConnectionId.is_null()),
777 )
778 .all(&*tx)
779 .await?;
780 room_participant::Entity::delete_many()
781 .filter(
782 room_participant::Column::Id
783 .is_in(called_participants.iter().map(|participant| participant.id)),
784 )
785 .exec(&*tx)
786 .await?;
787 let canceled_calls_to_user_ids = called_participants
788 .into_iter()
789 .map(|participant| participant.user_id)
790 .collect();
791
792 // Detect left projects.
793 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
794 enum QueryProjectIds {
795 ProjectId,
796 }
797 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
798 .select_only()
799 .column_as(
800 project_collaborator::Column::ProjectId,
801 QueryProjectIds::ProjectId,
802 )
803 .filter(
804 Condition::all()
805 .add(
806 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
807 )
808 .add(
809 project_collaborator::Column::ConnectionServerId
810 .eq(connection.owner_id as i32),
811 ),
812 )
813 .into_values::<_, QueryProjectIds>()
814 .all(&*tx)
815 .await?;
816 let mut left_projects = HashMap::default();
817 let mut collaborators = project_collaborator::Entity::find()
818 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
819 .stream(&*tx)
820 .await?;
821 while let Some(collaborator) = collaborators.next().await {
822 let collaborator = collaborator?;
823 let left_project =
824 left_projects
825 .entry(collaborator.project_id)
826 .or_insert(LeftProject {
827 id: collaborator.project_id,
828 host_user_id: Default::default(),
829 connection_ids: Default::default(),
830 host_connection_id: None,
831 });
832
833 let collaborator_connection_id = collaborator.connection();
834 if collaborator_connection_id != connection {
835 left_project.connection_ids.push(collaborator_connection_id);
836 }
837
838 if collaborator.is_host {
839 left_project.host_user_id = Some(collaborator.user_id);
840 left_project.host_connection_id = Some(collaborator_connection_id);
841 }
842 }
843 drop(collaborators);
844
845 // Leave projects.
846 project_collaborator::Entity::delete_many()
847 .filter(
848 Condition::all()
849 .add(
850 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
851 )
852 .add(
853 project_collaborator::Column::ConnectionServerId
854 .eq(connection.owner_id as i32),
855 ),
856 )
857 .exec(&*tx)
858 .await?;
859
860 follower::Entity::delete_many()
861 .filter(
862 Condition::all()
863 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
864 )
865 .exec(&*tx)
866 .await?;
867
868 // Unshare projects.
869 project::Entity::delete_many()
870 .filter(
871 Condition::all()
872 .add(project::Column::RoomId.eq(room_id))
873 .add(project::Column::HostConnectionId.eq(connection.id as i32))
874 .add(
875 project::Column::HostConnectionServerId
876 .eq(connection.owner_id as i32),
877 ),
878 )
879 .exec(&*tx)
880 .await?;
881
882 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
883 let deleted = if room.participants.is_empty() {
884 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
885 result.rows_affected > 0
886 } else {
887 false
888 };
889
890 let left_room = LeftRoom {
891 room,
892 channel,
893 left_projects,
894 canceled_calls_to_user_ids,
895 deleted,
896 };
897
898 if left_room.room.participants.is_empty() {
899 self.rooms.remove(&room_id);
900 }
901
902 Ok(Some((room_id, left_room)))
903 } else {
904 Ok(None)
905 }
906 })
907 .await
908 }
909
910 /// Updates the location of a participant in the given room.
911 pub async fn update_room_participant_location(
912 &self,
913 room_id: RoomId,
914 connection: ConnectionId,
915 location: proto::ParticipantLocation,
916 ) -> Result<RoomGuard<proto::Room>> {
917 self.room_transaction(room_id, |tx| async {
918 let tx = tx;
919 let location_kind;
920 let location_project_id;
921 match location
922 .variant
923 .as_ref()
924 .ok_or_else(|| anyhow!("invalid location"))?
925 {
926 proto::participant_location::Variant::SharedProject(project) => {
927 location_kind = 0;
928 location_project_id = Some(ProjectId::from_proto(project.id));
929 }
930 proto::participant_location::Variant::UnsharedProject(_) => {
931 location_kind = 1;
932 location_project_id = None;
933 }
934 proto::participant_location::Variant::External(_) => {
935 location_kind = 2;
936 location_project_id = None;
937 }
938 }
939
940 let result = room_participant::Entity::update_many()
941 .filter(
942 Condition::all()
943 .add(room_participant::Column::RoomId.eq(room_id))
944 .add(
945 room_participant::Column::AnsweringConnectionId
946 .eq(connection.id as i32),
947 )
948 .add(
949 room_participant::Column::AnsweringConnectionServerId
950 .eq(connection.owner_id as i32),
951 ),
952 )
953 .set(room_participant::ActiveModel {
954 location_kind: ActiveValue::set(Some(location_kind)),
955 location_project_id: ActiveValue::set(location_project_id),
956 ..Default::default()
957 })
958 .exec(&*tx)
959 .await?;
960
961 if result.rows_affected == 1 {
962 let room = self.get_room(room_id, &tx).await?;
963 Ok(room)
964 } else {
965 Err(anyhow!("could not update room participant location"))?
966 }
967 })
968 .await
969 }
970
971 /// Sets the role of a participant in the given room.
972 pub async fn set_room_participant_role(
973 &self,
974 admin_id: UserId,
975 room_id: RoomId,
976 user_id: UserId,
977 role: ChannelRole,
978 ) -> Result<RoomGuard<proto::Room>> {
979 self.room_transaction(room_id, |tx| async move {
980 room_participant::Entity::find()
981 .filter(
982 Condition::all()
983 .add(room_participant::Column::RoomId.eq(room_id))
984 .add(room_participant::Column::UserId.eq(admin_id))
985 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
986 )
987 .one(&*tx)
988 .await?
989 .ok_or_else(|| anyhow!("only admins can set participant role"))?;
990
991 if role.requires_cla() {
992 self.check_user_has_signed_cla(user_id, room_id, &tx)
993 .await?;
994 }
995
996 let result = room_participant::Entity::update_many()
997 .filter(
998 Condition::all()
999 .add(room_participant::Column::RoomId.eq(room_id))
1000 .add(room_participant::Column::UserId.eq(user_id)),
1001 )
1002 .set(room_participant::ActiveModel {
1003 role: ActiveValue::set(Some(role)),
1004 ..Default::default()
1005 })
1006 .exec(&*tx)
1007 .await?;
1008
1009 if result.rows_affected != 1 {
1010 Err(anyhow!("could not update room participant role"))?;
1011 }
1012 self.get_room(room_id, &tx).await
1013 })
1014 .await
1015 }
1016
1017 async fn check_user_has_signed_cla(
1018 &self,
1019 user_id: UserId,
1020 room_id: RoomId,
1021 tx: &DatabaseTransaction,
1022 ) -> Result<()> {
1023 let channel = room::Entity::find_by_id(room_id)
1024 .one(tx)
1025 .await?
1026 .ok_or_else(|| anyhow!("could not find room"))?
1027 .find_related(channel::Entity)
1028 .one(tx)
1029 .await?;
1030
1031 if let Some(channel) = channel {
1032 let requires_zed_cla = channel.requires_zed_cla
1033 || channel::Entity::find()
1034 .filter(
1035 channel::Column::Id
1036 .is_in(channel.ancestors())
1037 .and(channel::Column::RequiresZedCla.eq(true)),
1038 )
1039 .count(tx)
1040 .await?
1041 > 0;
1042 if requires_zed_cla {
1043 if contributor::Entity::find()
1044 .filter(contributor::Column::UserId.eq(user_id))
1045 .one(tx)
1046 .await?
1047 .is_none()
1048 {
1049 Err(anyhow!("user has not signed the Zed CLA"))?;
1050 }
1051 }
1052 }
1053 Ok(())
1054 }
1055
1056 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1057 self.transaction(|tx| async move {
1058 self.room_connection_lost(connection, &tx).await?;
1059 self.channel_buffer_connection_lost(connection, &tx).await?;
1060 self.channel_chat_connection_lost(connection, &tx).await?;
1061 Ok(())
1062 })
1063 .await
1064 }
1065
1066 pub async fn room_connection_lost(
1067 &self,
1068 connection: ConnectionId,
1069 tx: &DatabaseTransaction,
1070 ) -> Result<()> {
1071 let participant = room_participant::Entity::find()
1072 .filter(
1073 Condition::all()
1074 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1075 .add(
1076 room_participant::Column::AnsweringConnectionServerId
1077 .eq(connection.owner_id as i32),
1078 ),
1079 )
1080 .one(tx)
1081 .await?;
1082
1083 if let Some(participant) = participant {
1084 room_participant::Entity::update(room_participant::ActiveModel {
1085 answering_connection_lost: ActiveValue::set(true),
1086 ..participant.into_active_model()
1087 })
1088 .exec(tx)
1089 .await?;
1090 }
1091 Ok(())
1092 }
1093
1094 fn build_incoming_call(
1095 room: &proto::Room,
1096 called_user_id: UserId,
1097 ) -> Option<proto::IncomingCall> {
1098 let pending_participant = room
1099 .pending_participants
1100 .iter()
1101 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1102
1103 Some(proto::IncomingCall {
1104 room_id: room.id,
1105 calling_user_id: pending_participant.calling_user_id,
1106 participant_user_ids: room
1107 .participants
1108 .iter()
1109 .map(|participant| participant.user_id)
1110 .collect(),
1111 initial_project: room.participants.iter().find_map(|participant| {
1112 let initial_project_id = pending_participant.initial_project_id?;
1113 participant
1114 .projects
1115 .iter()
1116 .find(|project| project.id == initial_project_id)
1117 .cloned()
1118 }),
1119 })
1120 }
1121
1122 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1123 let (_, room) = self.get_channel_room(room_id, tx).await?;
1124 Ok(room)
1125 }
1126
1127 pub async fn room_connection_ids(
1128 &self,
1129 room_id: RoomId,
1130 connection_id: ConnectionId,
1131 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1132 self.room_transaction(room_id, |tx| async move {
1133 let mut participants = room_participant::Entity::find()
1134 .filter(room_participant::Column::RoomId.eq(room_id))
1135 .stream(&*tx)
1136 .await?;
1137
1138 let mut is_participant = false;
1139 let mut connection_ids = HashSet::default();
1140 while let Some(participant) = participants.next().await {
1141 let participant = participant?;
1142 if let Some(answering_connection) = participant.answering_connection() {
1143 if answering_connection == connection_id {
1144 is_participant = true;
1145 } else {
1146 connection_ids.insert(answering_connection);
1147 }
1148 }
1149 }
1150
1151 if !is_participant {
1152 Err(anyhow!("not a room participant"))?;
1153 }
1154
1155 Ok(connection_ids)
1156 })
1157 .await
1158 }
1159
1160 async fn get_channel_room(
1161 &self,
1162 room_id: RoomId,
1163 tx: &DatabaseTransaction,
1164 ) -> Result<(Option<channel::Model>, proto::Room)> {
1165 let db_room = room::Entity::find_by_id(room_id)
1166 .one(tx)
1167 .await?
1168 .ok_or_else(|| anyhow!("could not find room"))?;
1169
1170 let mut db_participants = db_room
1171 .find_related(room_participant::Entity)
1172 .stream(tx)
1173 .await?;
1174 let mut participants = HashMap::default();
1175 let mut pending_participants = Vec::new();
1176 while let Some(db_participant) = db_participants.next().await {
1177 let db_participant = db_participant?;
1178 if let (
1179 Some(answering_connection_id),
1180 Some(answering_connection_server_id),
1181 Some(participant_index),
1182 ) = (
1183 db_participant.answering_connection_id,
1184 db_participant.answering_connection_server_id,
1185 db_participant.participant_index,
1186 ) {
1187 let location = match (
1188 db_participant.location_kind,
1189 db_participant.location_project_id,
1190 ) {
1191 (Some(0), Some(project_id)) => {
1192 Some(proto::participant_location::Variant::SharedProject(
1193 proto::participant_location::SharedProject {
1194 id: project_id.to_proto(),
1195 },
1196 ))
1197 }
1198 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1199 Default::default(),
1200 )),
1201 _ => Some(proto::participant_location::Variant::External(
1202 Default::default(),
1203 )),
1204 };
1205
1206 let answering_connection = ConnectionId {
1207 owner_id: answering_connection_server_id.0 as u32,
1208 id: answering_connection_id as u32,
1209 };
1210 participants.insert(
1211 answering_connection,
1212 proto::Participant {
1213 user_id: db_participant.user_id.to_proto(),
1214 peer_id: Some(answering_connection.into()),
1215 projects: Default::default(),
1216 location: Some(proto::ParticipantLocation { variant: location }),
1217 participant_index: participant_index as u32,
1218 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1219 },
1220 );
1221 } else {
1222 pending_participants.push(proto::PendingParticipant {
1223 user_id: db_participant.user_id.to_proto(),
1224 calling_user_id: db_participant.calling_user_id.to_proto(),
1225 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1226 });
1227 }
1228 }
1229 drop(db_participants);
1230
1231 let mut db_projects = db_room
1232 .find_related(project::Entity)
1233 .find_with_related(worktree::Entity)
1234 .stream(tx)
1235 .await?;
1236
1237 while let Some(row) = db_projects.next().await {
1238 let (db_project, db_worktree) = row?;
1239 let host_connection = db_project.host_connection()?;
1240 if let Some(participant) = participants.get_mut(&host_connection) {
1241 let project = if let Some(project) = participant
1242 .projects
1243 .iter_mut()
1244 .find(|project| project.id == db_project.id.to_proto())
1245 {
1246 project
1247 } else {
1248 participant.projects.push(proto::ParticipantProject {
1249 id: db_project.id.to_proto(),
1250 worktree_root_names: Default::default(),
1251 });
1252 participant.projects.last_mut().unwrap()
1253 };
1254
1255 if let Some(db_worktree) = db_worktree {
1256 if db_worktree.visible {
1257 project.worktree_root_names.push(db_worktree.root_name);
1258 }
1259 }
1260 }
1261 }
1262 drop(db_projects);
1263
1264 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1265 let mut followers = Vec::new();
1266 while let Some(db_follower) = db_followers.next().await {
1267 let db_follower = db_follower?;
1268 followers.push(proto::Follower {
1269 leader_id: Some(db_follower.leader_connection().into()),
1270 follower_id: Some(db_follower.follower_connection().into()),
1271 project_id: db_follower.project_id.to_proto(),
1272 });
1273 }
1274 drop(db_followers);
1275
1276 let channel = if let Some(channel_id) = db_room.channel_id {
1277 Some(self.get_channel_internal(channel_id, tx).await?)
1278 } else {
1279 None
1280 };
1281
1282 Ok((
1283 channel,
1284 proto::Room {
1285 id: db_room.id.to_proto(),
1286 live_kit_room: db_room.live_kit_room,
1287 participants: participants.into_values().collect(),
1288 pending_participants,
1289 followers,
1290 },
1291 ))
1292 }
1293}