1use super::*;
2
3impl Database {
4 /// Clears all room participants in rooms attached to a stale server.
5 pub async fn clear_stale_room_participants(
6 &self,
7 room_id: RoomId,
8 new_server_id: ServerId,
9 ) -> Result<RoomGuard<RefreshedRoom>> {
10 self.room_transaction(room_id, |tx| async move {
11 let stale_participant_filter = Condition::all()
12 .add(room_participant::Column::RoomId.eq(room_id))
13 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
14 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
15
16 let stale_participant_user_ids = room_participant::Entity::find()
17 .filter(stale_participant_filter.clone())
18 .all(&*tx)
19 .await?
20 .into_iter()
21 .map(|participant| participant.user_id)
22 .collect::<Vec<_>>();
23
24 // Delete participants who failed to reconnect and cancel their calls.
25 let mut canceled_calls_to_user_ids = Vec::new();
26 room_participant::Entity::delete_many()
27 .filter(stale_participant_filter)
28 .exec(&*tx)
29 .await?;
30 let called_participants = room_participant::Entity::find()
31 .filter(
32 Condition::all()
33 .add(
34 room_participant::Column::CallingUserId
35 .is_in(stale_participant_user_ids.iter().copied()),
36 )
37 .add(room_participant::Column::AnsweringConnectionId.is_null()),
38 )
39 .all(&*tx)
40 .await?;
41 room_participant::Entity::delete_many()
42 .filter(
43 room_participant::Column::Id
44 .is_in(called_participants.iter().map(|participant| participant.id)),
45 )
46 .exec(&*tx)
47 .await?;
48 canceled_calls_to_user_ids.extend(
49 called_participants
50 .into_iter()
51 .map(|participant| participant.user_id),
52 );
53
54 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
55 let channel_members;
56 if let Some(channel) = &channel {
57 channel_members = self.get_channel_participants(channel, &tx).await?;
58 } else {
59 channel_members = Vec::new();
60
61 // Delete the room if it becomes empty.
62 if room.participants.is_empty() {
63 project::Entity::delete_many()
64 .filter(project::Column::RoomId.eq(room_id))
65 .exec(&*tx)
66 .await?;
67 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
68 }
69 };
70
71 Ok(RefreshedRoom {
72 room,
73 channel_id: channel.map(|channel| channel.id),
74 channel_members,
75 stale_participant_user_ids,
76 canceled_calls_to_user_ids,
77 })
78 })
79 .await
80 }
81
82 /// Returns the incoming calls for user with the given ID.
83 pub async fn incoming_call_for_user(
84 &self,
85 user_id: UserId,
86 ) -> Result<Option<proto::IncomingCall>> {
87 self.transaction(|tx| async move {
88 let pending_participant = room_participant::Entity::find()
89 .filter(
90 room_participant::Column::UserId
91 .eq(user_id)
92 .and(room_participant::Column::AnsweringConnectionId.is_null()),
93 )
94 .one(&*tx)
95 .await?;
96
97 if let Some(pending_participant) = pending_participant {
98 let room = self.get_room(pending_participant.room_id, &tx).await?;
99 Ok(Self::build_incoming_call(&room, user_id))
100 } else {
101 Ok(None)
102 }
103 })
104 .await
105 }
106
107 /// Creates a new room.
108 pub async fn create_room(
109 &self,
110 user_id: UserId,
111 connection: ConnectionId,
112 live_kit_room: &str,
113 ) -> Result<proto::Room> {
114 self.transaction(|tx| async move {
115 let room = room::ActiveModel {
116 live_kit_room: ActiveValue::set(live_kit_room.into()),
117 ..Default::default()
118 }
119 .insert(&*tx)
120 .await?;
121 room_participant::ActiveModel {
122 room_id: ActiveValue::set(room.id),
123 user_id: ActiveValue::set(user_id),
124 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
125 answering_connection_server_id: ActiveValue::set(Some(ServerId(
126 connection.owner_id as i32,
127 ))),
128 answering_connection_lost: ActiveValue::set(false),
129 calling_user_id: ActiveValue::set(user_id),
130 calling_connection_id: ActiveValue::set(connection.id as i32),
131 calling_connection_server_id: ActiveValue::set(Some(ServerId(
132 connection.owner_id as i32,
133 ))),
134 participant_index: ActiveValue::set(Some(0)),
135 role: ActiveValue::set(Some(ChannelRole::Admin)),
136
137 id: ActiveValue::NotSet,
138 location_kind: ActiveValue::NotSet,
139 location_project_id: ActiveValue::NotSet,
140 initial_project_id: ActiveValue::NotSet,
141 }
142 .insert(&*tx)
143 .await?;
144
145 let room = self.get_room(room.id, &tx).await?;
146 Ok(room)
147 })
148 .await
149 }
150
151 pub async fn call(
152 &self,
153 room_id: RoomId,
154 calling_user_id: UserId,
155 calling_connection: ConnectionId,
156 called_user_id: UserId,
157 initial_project_id: Option<ProjectId>,
158 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
159 self.room_transaction(room_id, |tx| async move {
160 let caller = room_participant::Entity::find()
161 .filter(
162 room_participant::Column::UserId
163 .eq(calling_user_id)
164 .and(room_participant::Column::RoomId.eq(room_id)),
165 )
166 .one(&*tx)
167 .await?
168 .ok_or_else(|| anyhow!("user is not in the room"))?;
169
170 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
171 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
172 ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
173 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
174 };
175
176 room_participant::ActiveModel {
177 room_id: ActiveValue::set(room_id),
178 user_id: ActiveValue::set(called_user_id),
179 answering_connection_lost: ActiveValue::set(false),
180 participant_index: ActiveValue::NotSet,
181 calling_user_id: ActiveValue::set(calling_user_id),
182 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
183 calling_connection_server_id: ActiveValue::set(Some(ServerId(
184 calling_connection.owner_id as i32,
185 ))),
186 initial_project_id: ActiveValue::set(initial_project_id),
187 role: ActiveValue::set(Some(called_user_role)),
188
189 id: ActiveValue::NotSet,
190 answering_connection_id: ActiveValue::NotSet,
191 answering_connection_server_id: ActiveValue::NotSet,
192 location_kind: ActiveValue::NotSet,
193 location_project_id: ActiveValue::NotSet,
194 }
195 .insert(&*tx)
196 .await?;
197
198 let room = self.get_room(room_id, &tx).await?;
199 let incoming_call = Self::build_incoming_call(&room, called_user_id)
200 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
201 Ok((room, incoming_call))
202 })
203 .await
204 }
205
206 pub async fn call_failed(
207 &self,
208 room_id: RoomId,
209 called_user_id: UserId,
210 ) -> Result<RoomGuard<proto::Room>> {
211 self.room_transaction(room_id, |tx| async move {
212 room_participant::Entity::delete_many()
213 .filter(
214 room_participant::Column::RoomId
215 .eq(room_id)
216 .and(room_participant::Column::UserId.eq(called_user_id)),
217 )
218 .exec(&*tx)
219 .await?;
220 let room = self.get_room(room_id, &tx).await?;
221 Ok(room)
222 })
223 .await
224 }
225
226 pub async fn decline_call(
227 &self,
228 expected_room_id: Option<RoomId>,
229 user_id: UserId,
230 ) -> Result<Option<RoomGuard<proto::Room>>> {
231 self.optional_room_transaction(|tx| async move {
232 let mut filter = Condition::all()
233 .add(room_participant::Column::UserId.eq(user_id))
234 .add(room_participant::Column::AnsweringConnectionId.is_null());
235 if let Some(room_id) = expected_room_id {
236 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
237 }
238 let participant = room_participant::Entity::find()
239 .filter(filter)
240 .one(&*tx)
241 .await?;
242
243 let participant = if let Some(participant) = participant {
244 participant
245 } else if expected_room_id.is_some() {
246 return Err(anyhow!("could not find call to decline"))?;
247 } else {
248 return Ok(None);
249 };
250
251 let room_id = participant.room_id;
252 room_participant::Entity::delete(participant.into_active_model())
253 .exec(&*tx)
254 .await?;
255
256 let room = self.get_room(room_id, &tx).await?;
257 Ok(Some((room_id, room)))
258 })
259 .await
260 }
261
262 pub async fn cancel_call(
263 &self,
264 room_id: RoomId,
265 calling_connection: ConnectionId,
266 called_user_id: UserId,
267 ) -> Result<RoomGuard<proto::Room>> {
268 self.room_transaction(room_id, |tx| async move {
269 let participant = room_participant::Entity::find()
270 .filter(
271 Condition::all()
272 .add(room_participant::Column::UserId.eq(called_user_id))
273 .add(room_participant::Column::RoomId.eq(room_id))
274 .add(
275 room_participant::Column::CallingConnectionId
276 .eq(calling_connection.id as i32),
277 )
278 .add(
279 room_participant::Column::CallingConnectionServerId
280 .eq(calling_connection.owner_id as i32),
281 )
282 .add(room_participant::Column::AnsweringConnectionId.is_null()),
283 )
284 .one(&*tx)
285 .await?
286 .ok_or_else(|| anyhow!("no call to cancel"))?;
287
288 room_participant::Entity::delete(participant.into_active_model())
289 .exec(&*tx)
290 .await?;
291
292 let room = self.get_room(room_id, &tx).await?;
293 Ok(room)
294 })
295 .await
296 }
297
298 pub async fn join_room(
299 &self,
300 room_id: RoomId,
301 user_id: UserId,
302 connection: ConnectionId,
303 ) -> Result<RoomGuard<JoinRoom>> {
304 self.room_transaction(room_id, |tx| async move {
305 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
306 enum QueryChannelId {
307 ChannelId,
308 }
309
310 let channel_id: Option<ChannelId> = room::Entity::find()
311 .select_only()
312 .column(room::Column::ChannelId)
313 .filter(room::Column::Id.eq(room_id))
314 .into_values::<_, QueryChannelId>()
315 .one(&*tx)
316 .await?
317 .ok_or_else(|| anyhow!("no such room"))?;
318
319 if channel_id.is_some() {
320 Err(anyhow!("tried to join channel call directly"))?
321 }
322
323 let participant_index = self
324 .get_next_participant_index_internal(room_id, &tx)
325 .await?;
326
327 let result = room_participant::Entity::update_many()
328 .filter(
329 Condition::all()
330 .add(room_participant::Column::RoomId.eq(room_id))
331 .add(room_participant::Column::UserId.eq(user_id))
332 .add(room_participant::Column::AnsweringConnectionId.is_null()),
333 )
334 .set(room_participant::ActiveModel {
335 participant_index: ActiveValue::Set(Some(participant_index)),
336 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
337 answering_connection_server_id: ActiveValue::set(Some(ServerId(
338 connection.owner_id as i32,
339 ))),
340 answering_connection_lost: ActiveValue::set(false),
341 ..Default::default()
342 })
343 .exec(&*tx)
344 .await?;
345 if result.rows_affected == 0 {
346 Err(anyhow!("room does not exist or was already joined"))?;
347 }
348
349 let room = self.get_room(room_id, &tx).await?;
350 Ok(JoinRoom {
351 room,
352 channel_id: None,
353 channel_members: vec![],
354 })
355 })
356 .await
357 }
358
359 async fn get_next_participant_index_internal(
360 &self,
361 room_id: RoomId,
362 tx: &DatabaseTransaction,
363 ) -> Result<i32> {
364 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
365 enum QueryParticipantIndices {
366 ParticipantIndex,
367 }
368 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
369 .filter(
370 room_participant::Column::RoomId
371 .eq(room_id)
372 .and(room_participant::Column::ParticipantIndex.is_not_null()),
373 )
374 .select_only()
375 .column(room_participant::Column::ParticipantIndex)
376 .into_values::<_, QueryParticipantIndices>()
377 .all(tx)
378 .await?;
379
380 let mut participant_index = 0;
381 while existing_participant_indices.contains(&participant_index) {
382 participant_index += 1;
383 }
384
385 Ok(participant_index)
386 }
387
388 /// Returns the channel ID for the given room, if it has one.
389 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
390 self.transaction(|tx| async move {
391 let room: Option<room::Model> = room::Entity::find()
392 .filter(room::Column::Id.eq(room_id))
393 .one(&*tx)
394 .await?;
395
396 Ok(room.and_then(|room| room.channel_id))
397 })
398 .await
399 }
400
401 pub(crate) async fn join_channel_room_internal(
402 &self,
403 room_id: RoomId,
404 user_id: UserId,
405 connection: ConnectionId,
406 role: ChannelRole,
407 tx: &DatabaseTransaction,
408 ) -> Result<JoinRoom> {
409 let participant_index = self
410 .get_next_participant_index_internal(room_id, tx)
411 .await?;
412
413 room_participant::Entity::insert_many([room_participant::ActiveModel {
414 room_id: ActiveValue::set(room_id),
415 user_id: ActiveValue::set(user_id),
416 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
417 answering_connection_server_id: ActiveValue::set(Some(ServerId(
418 connection.owner_id as i32,
419 ))),
420 answering_connection_lost: ActiveValue::set(false),
421 calling_user_id: ActiveValue::set(user_id),
422 calling_connection_id: ActiveValue::set(connection.id as i32),
423 calling_connection_server_id: ActiveValue::set(Some(ServerId(
424 connection.owner_id as i32,
425 ))),
426 participant_index: ActiveValue::Set(Some(participant_index)),
427 role: ActiveValue::set(Some(role)),
428 id: ActiveValue::NotSet,
429 location_kind: ActiveValue::NotSet,
430 location_project_id: ActiveValue::NotSet,
431 initial_project_id: ActiveValue::NotSet,
432 }])
433 .on_conflict(
434 OnConflict::columns([room_participant::Column::UserId])
435 .update_columns([
436 room_participant::Column::AnsweringConnectionId,
437 room_participant::Column::AnsweringConnectionServerId,
438 room_participant::Column::AnsweringConnectionLost,
439 room_participant::Column::ParticipantIndex,
440 room_participant::Column::Role,
441 ])
442 .to_owned(),
443 )
444 .exec(tx)
445 .await?;
446
447 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
448 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
449 let channel_members = self.get_channel_participants(&channel, tx).await?;
450 Ok(JoinRoom {
451 room,
452 channel_id: Some(channel.id),
453 channel_members,
454 })
455 }
456
457 pub async fn rejoin_room(
458 &self,
459 rejoin_room: proto::RejoinRoom,
460 user_id: UserId,
461 connection: ConnectionId,
462 ) -> Result<RoomGuard<RejoinedRoom>> {
463 let room_id = RoomId::from_proto(rejoin_room.id);
464 self.room_transaction(room_id, |tx| async {
465 let tx = tx;
466 let participant_update = room_participant::Entity::update_many()
467 .filter(
468 Condition::all()
469 .add(room_participant::Column::RoomId.eq(room_id))
470 .add(room_participant::Column::UserId.eq(user_id))
471 .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
472 )
473 .set(room_participant::ActiveModel {
474 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
475 answering_connection_server_id: ActiveValue::set(Some(ServerId(
476 connection.owner_id as i32,
477 ))),
478 answering_connection_lost: ActiveValue::set(false),
479 ..Default::default()
480 })
481 .exec(&*tx)
482 .await?;
483 if participant_update.rows_affected == 0 {
484 return Err(anyhow!("room does not exist or was already joined"))?;
485 }
486
487 let mut reshared_projects = Vec::new();
488 for reshared_project in &rejoin_room.reshared_projects {
489 let project_id = ProjectId::from_proto(reshared_project.project_id);
490 let project = project::Entity::find_by_id(project_id)
491 .one(&*tx)
492 .await?
493 .ok_or_else(|| anyhow!("project does not exist"))?;
494 if project.host_user_id != Some(user_id) {
495 return Err(anyhow!("no such project"))?;
496 }
497
498 let mut collaborators = project
499 .find_related(project_collaborator::Entity)
500 .all(&*tx)
501 .await?;
502 let host_ix = collaborators
503 .iter()
504 .position(|collaborator| {
505 collaborator.user_id == user_id && collaborator.is_host
506 })
507 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
508 let host = collaborators.swap_remove(host_ix);
509 let old_connection_id = host.connection();
510
511 project::Entity::update(project::ActiveModel {
512 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
513 host_connection_server_id: ActiveValue::set(Some(ServerId(
514 connection.owner_id as i32,
515 ))),
516 ..project.into_active_model()
517 })
518 .exec(&*tx)
519 .await?;
520 project_collaborator::Entity::update(project_collaborator::ActiveModel {
521 connection_id: ActiveValue::set(connection.id as i32),
522 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
523 ..host.into_active_model()
524 })
525 .exec(&*tx)
526 .await?;
527
528 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
529 .await?;
530
531 reshared_projects.push(ResharedProject {
532 id: project_id,
533 old_connection_id,
534 collaborators: collaborators
535 .iter()
536 .map(|collaborator| ProjectCollaborator {
537 connection_id: collaborator.connection(),
538 user_id: collaborator.user_id,
539 replica_id: collaborator.replica_id,
540 is_host: collaborator.is_host,
541 })
542 .collect(),
543 worktrees: reshared_project.worktrees.clone(),
544 });
545 }
546
547 project::Entity::delete_many()
548 .filter(
549 Condition::all()
550 .add(project::Column::RoomId.eq(room_id))
551 .add(project::Column::HostUserId.eq(user_id))
552 .add(
553 project::Column::Id
554 .is_not_in(reshared_projects.iter().map(|project| project.id)),
555 ),
556 )
557 .exec(&*tx)
558 .await?;
559
560 let mut rejoined_projects = Vec::new();
561 for rejoined_project in &rejoin_room.rejoined_projects {
562 let project_id = ProjectId::from_proto(rejoined_project.id);
563 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
564 continue;
565 };
566
567 let mut worktrees = Vec::new();
568 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
569 for db_worktree in db_worktrees {
570 let mut worktree = RejoinedWorktree {
571 id: db_worktree.id as u64,
572 abs_path: db_worktree.abs_path,
573 root_name: db_worktree.root_name,
574 visible: db_worktree.visible,
575 updated_entries: Default::default(),
576 removed_entries: Default::default(),
577 updated_repositories: Default::default(),
578 removed_repositories: Default::default(),
579 diagnostic_summaries: Default::default(),
580 settings_files: Default::default(),
581 scan_id: db_worktree.scan_id as u64,
582 completed_scan_id: db_worktree.completed_scan_id as u64,
583 };
584
585 let rejoined_worktree = rejoined_project
586 .worktrees
587 .iter()
588 .find(|worktree| worktree.id == db_worktree.id as u64);
589
590 // File entries
591 {
592 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
593 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
594 } else {
595 worktree_entry::Column::IsDeleted.eq(false)
596 };
597
598 let mut db_entries = worktree_entry::Entity::find()
599 .filter(
600 Condition::all()
601 .add(worktree_entry::Column::ProjectId.eq(project.id))
602 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
603 .add(entry_filter),
604 )
605 .stream(&*tx)
606 .await?;
607
608 while let Some(db_entry) = db_entries.next().await {
609 let db_entry = db_entry?;
610 if db_entry.is_deleted {
611 worktree.removed_entries.push(db_entry.id as u64);
612 } else {
613 worktree.updated_entries.push(proto::Entry {
614 id: db_entry.id as u64,
615 is_dir: db_entry.is_dir,
616 path: db_entry.path,
617 inode: db_entry.inode as u64,
618 mtime: Some(proto::Timestamp {
619 seconds: db_entry.mtime_seconds as u64,
620 nanos: db_entry.mtime_nanos as u32,
621 }),
622 is_symlink: db_entry.is_symlink,
623 is_ignored: db_entry.is_ignored,
624 is_external: db_entry.is_external,
625 git_status: db_entry.git_status.map(|status| status as i32),
626 });
627 }
628 }
629 }
630
631 // Repository Entries
632 {
633 let repository_entry_filter =
634 if let Some(rejoined_worktree) = rejoined_worktree {
635 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
636 } else {
637 worktree_repository::Column::IsDeleted.eq(false)
638 };
639
640 let mut db_repositories = worktree_repository::Entity::find()
641 .filter(
642 Condition::all()
643 .add(worktree_repository::Column::ProjectId.eq(project.id))
644 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
645 .add(repository_entry_filter),
646 )
647 .stream(&*tx)
648 .await?;
649
650 while let Some(db_repository) = db_repositories.next().await {
651 let db_repository = db_repository?;
652 if db_repository.is_deleted {
653 worktree
654 .removed_repositories
655 .push(db_repository.work_directory_id as u64);
656 } else {
657 worktree.updated_repositories.push(proto::RepositoryEntry {
658 work_directory_id: db_repository.work_directory_id as u64,
659 branch: db_repository.branch,
660 });
661 }
662 }
663 }
664
665 worktrees.push(worktree);
666 }
667
668 let language_servers = project
669 .find_related(language_server::Entity)
670 .all(&*tx)
671 .await?
672 .into_iter()
673 .map(|language_server| proto::LanguageServer {
674 id: language_server.id as u64,
675 name: language_server.name,
676 })
677 .collect::<Vec<_>>();
678
679 {
680 let mut db_settings_files = worktree_settings_file::Entity::find()
681 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
682 .stream(&*tx)
683 .await?;
684 while let Some(db_settings_file) = db_settings_files.next().await {
685 let db_settings_file = db_settings_file?;
686 if let Some(worktree) = worktrees
687 .iter_mut()
688 .find(|w| w.id == db_settings_file.worktree_id as u64)
689 {
690 worktree.settings_files.push(WorktreeSettingsFile {
691 path: db_settings_file.path,
692 content: db_settings_file.content,
693 });
694 }
695 }
696 }
697
698 let mut collaborators = project
699 .find_related(project_collaborator::Entity)
700 .all(&*tx)
701 .await?;
702 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
703 .iter()
704 .position(|collaborator| collaborator.user_id == user_id)
705 {
706 collaborators.swap_remove(self_collaborator_ix)
707 } else {
708 continue;
709 };
710 let old_connection_id = self_collaborator.connection();
711 project_collaborator::Entity::update(project_collaborator::ActiveModel {
712 connection_id: ActiveValue::set(connection.id as i32),
713 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
714 ..self_collaborator.into_active_model()
715 })
716 .exec(&*tx)
717 .await?;
718
719 let collaborators = collaborators
720 .into_iter()
721 .map(|collaborator| ProjectCollaborator {
722 connection_id: collaborator.connection(),
723 user_id: collaborator.user_id,
724 replica_id: collaborator.replica_id,
725 is_host: collaborator.is_host,
726 })
727 .collect::<Vec<_>>();
728
729 rejoined_projects.push(RejoinedProject {
730 id: project_id,
731 old_connection_id,
732 collaborators,
733 worktrees,
734 language_servers,
735 });
736 }
737
738 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
739 let channel_members = if let Some(channel) = &channel {
740 self.get_channel_participants(&channel, &tx).await?
741 } else {
742 Vec::new()
743 };
744
745 Ok(RejoinedRoom {
746 room,
747 channel_id: channel.map(|channel| channel.id),
748 channel_members,
749 rejoined_projects,
750 reshared_projects,
751 })
752 })
753 .await
754 }
755
756 pub async fn leave_room(
757 &self,
758 connection: ConnectionId,
759 ) -> Result<Option<RoomGuard<LeftRoom>>> {
760 self.optional_room_transaction(|tx| async move {
761 let leaving_participant = room_participant::Entity::find()
762 .filter(
763 Condition::all()
764 .add(
765 room_participant::Column::AnsweringConnectionId
766 .eq(connection.id as i32),
767 )
768 .add(
769 room_participant::Column::AnsweringConnectionServerId
770 .eq(connection.owner_id as i32),
771 ),
772 )
773 .one(&*tx)
774 .await?;
775
776 if let Some(leaving_participant) = leaving_participant {
777 // Leave room.
778 let room_id = leaving_participant.room_id;
779 room_participant::Entity::delete_by_id(leaving_participant.id)
780 .exec(&*tx)
781 .await?;
782
783 // Cancel pending calls initiated by the leaving user.
784 let called_participants = room_participant::Entity::find()
785 .filter(
786 Condition::all()
787 .add(
788 room_participant::Column::CallingUserId
789 .eq(leaving_participant.user_id),
790 )
791 .add(room_participant::Column::AnsweringConnectionId.is_null()),
792 )
793 .all(&*tx)
794 .await?;
795 room_participant::Entity::delete_many()
796 .filter(
797 room_participant::Column::Id
798 .is_in(called_participants.iter().map(|participant| participant.id)),
799 )
800 .exec(&*tx)
801 .await?;
802 let canceled_calls_to_user_ids = called_participants
803 .into_iter()
804 .map(|participant| participant.user_id)
805 .collect();
806
807 // Detect left projects.
808 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
809 enum QueryProjectIds {
810 ProjectId,
811 }
812 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
813 .select_only()
814 .column_as(
815 project_collaborator::Column::ProjectId,
816 QueryProjectIds::ProjectId,
817 )
818 .filter(
819 Condition::all()
820 .add(
821 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
822 )
823 .add(
824 project_collaborator::Column::ConnectionServerId
825 .eq(connection.owner_id as i32),
826 ),
827 )
828 .into_values::<_, QueryProjectIds>()
829 .all(&*tx)
830 .await?;
831 let mut left_projects = HashMap::default();
832 let mut collaborators = project_collaborator::Entity::find()
833 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
834 .stream(&*tx)
835 .await?;
836 while let Some(collaborator) = collaborators.next().await {
837 let collaborator = collaborator?;
838 let left_project =
839 left_projects
840 .entry(collaborator.project_id)
841 .or_insert(LeftProject {
842 id: collaborator.project_id,
843 host_user_id: Default::default(),
844 connection_ids: Default::default(),
845 host_connection_id: None,
846 });
847
848 let collaborator_connection_id = collaborator.connection();
849 if collaborator_connection_id != connection {
850 left_project.connection_ids.push(collaborator_connection_id);
851 }
852
853 if collaborator.is_host {
854 left_project.host_user_id = Some(collaborator.user_id);
855 left_project.host_connection_id = Some(collaborator_connection_id);
856 }
857 }
858 drop(collaborators);
859
860 // Leave projects.
861 project_collaborator::Entity::delete_many()
862 .filter(
863 Condition::all()
864 .add(
865 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
866 )
867 .add(
868 project_collaborator::Column::ConnectionServerId
869 .eq(connection.owner_id as i32),
870 ),
871 )
872 .exec(&*tx)
873 .await?;
874
875 follower::Entity::delete_many()
876 .filter(
877 Condition::all()
878 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
879 )
880 .exec(&*tx)
881 .await?;
882
883 // Unshare projects.
884 project::Entity::delete_many()
885 .filter(
886 Condition::all()
887 .add(project::Column::RoomId.eq(room_id))
888 .add(project::Column::HostConnectionId.eq(connection.id as i32))
889 .add(
890 project::Column::HostConnectionServerId
891 .eq(connection.owner_id as i32),
892 ),
893 )
894 .exec(&*tx)
895 .await?;
896
897 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
898 let deleted = if room.participants.is_empty() {
899 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
900 result.rows_affected > 0
901 } else {
902 false
903 };
904
905 let channel_members = if let Some(channel) = &channel {
906 self.get_channel_participants(channel, &tx).await?
907 } else {
908 Vec::new()
909 };
910 let left_room = LeftRoom {
911 room,
912 channel_id: channel.map(|channel| channel.id),
913 channel_members,
914 left_projects,
915 canceled_calls_to_user_ids,
916 deleted,
917 };
918
919 if left_room.room.participants.is_empty() {
920 self.rooms.remove(&room_id);
921 }
922
923 Ok(Some((room_id, left_room)))
924 } else {
925 Ok(None)
926 }
927 })
928 .await
929 }
930
931 /// Updates the location of a participant in the given room.
932 pub async fn update_room_participant_location(
933 &self,
934 room_id: RoomId,
935 connection: ConnectionId,
936 location: proto::ParticipantLocation,
937 ) -> Result<RoomGuard<proto::Room>> {
938 self.room_transaction(room_id, |tx| async {
939 let tx = tx;
940 let location_kind;
941 let location_project_id;
942 match location
943 .variant
944 .as_ref()
945 .ok_or_else(|| anyhow!("invalid location"))?
946 {
947 proto::participant_location::Variant::SharedProject(project) => {
948 location_kind = 0;
949 location_project_id = Some(ProjectId::from_proto(project.id));
950 }
951 proto::participant_location::Variant::UnsharedProject(_) => {
952 location_kind = 1;
953 location_project_id = None;
954 }
955 proto::participant_location::Variant::External(_) => {
956 location_kind = 2;
957 location_project_id = None;
958 }
959 }
960
961 let result = room_participant::Entity::update_many()
962 .filter(
963 Condition::all()
964 .add(room_participant::Column::RoomId.eq(room_id))
965 .add(
966 room_participant::Column::AnsweringConnectionId
967 .eq(connection.id as i32),
968 )
969 .add(
970 room_participant::Column::AnsweringConnectionServerId
971 .eq(connection.owner_id as i32),
972 ),
973 )
974 .set(room_participant::ActiveModel {
975 location_kind: ActiveValue::set(Some(location_kind)),
976 location_project_id: ActiveValue::set(location_project_id),
977 ..Default::default()
978 })
979 .exec(&*tx)
980 .await?;
981
982 if result.rows_affected == 1 {
983 let room = self.get_room(room_id, &tx).await?;
984 Ok(room)
985 } else {
986 Err(anyhow!("could not update room participant location"))?
987 }
988 })
989 .await
990 }
991
992 /// Sets the role of a participant in the given room.
993 pub async fn set_room_participant_role(
994 &self,
995 admin_id: UserId,
996 room_id: RoomId,
997 user_id: UserId,
998 role: ChannelRole,
999 ) -> Result<RoomGuard<proto::Room>> {
1000 self.room_transaction(room_id, |tx| async move {
1001 room_participant::Entity::find()
1002 .filter(
1003 Condition::all()
1004 .add(room_participant::Column::RoomId.eq(room_id))
1005 .add(room_participant::Column::UserId.eq(admin_id))
1006 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1007 )
1008 .one(&*tx)
1009 .await?
1010 .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1011
1012 if role.requires_cla() {
1013 self.check_user_has_signed_cla(user_id, room_id, &tx)
1014 .await?;
1015 }
1016
1017 let result = room_participant::Entity::update_many()
1018 .filter(
1019 Condition::all()
1020 .add(room_participant::Column::RoomId.eq(room_id))
1021 .add(room_participant::Column::UserId.eq(user_id)),
1022 )
1023 .set(room_participant::ActiveModel {
1024 role: ActiveValue::set(Some(role)),
1025 ..Default::default()
1026 })
1027 .exec(&*tx)
1028 .await?;
1029
1030 if result.rows_affected != 1 {
1031 Err(anyhow!("could not update room participant role"))?;
1032 }
1033 self.get_room(room_id, &tx).await
1034 })
1035 .await
1036 }
1037
1038 async fn check_user_has_signed_cla(
1039 &self,
1040 user_id: UserId,
1041 room_id: RoomId,
1042 tx: &DatabaseTransaction,
1043 ) -> Result<()> {
1044 let channel = room::Entity::find_by_id(room_id)
1045 .one(tx)
1046 .await?
1047 .ok_or_else(|| anyhow!("could not find room"))?
1048 .find_related(channel::Entity)
1049 .one(tx)
1050 .await?;
1051
1052 if let Some(channel) = channel {
1053 let requires_zed_cla = channel.requires_zed_cla
1054 || channel::Entity::find()
1055 .filter(
1056 channel::Column::Id
1057 .is_in(channel.ancestors())
1058 .and(channel::Column::RequiresZedCla.eq(true)),
1059 )
1060 .count(tx)
1061 .await?
1062 > 0;
1063 if requires_zed_cla {
1064 if contributor::Entity::find()
1065 .filter(contributor::Column::UserId.eq(user_id))
1066 .one(tx)
1067 .await?
1068 .is_none()
1069 {
1070 Err(anyhow!("user has not signed the Zed CLA"))?;
1071 }
1072 }
1073 }
1074 Ok(())
1075 }
1076
1077 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1078 self.transaction(|tx| async move {
1079 self.room_connection_lost(connection, &tx).await?;
1080 self.channel_buffer_connection_lost(connection, &tx).await?;
1081 self.channel_chat_connection_lost(connection, &tx).await?;
1082 Ok(())
1083 })
1084 .await
1085 }
1086
1087 pub async fn room_connection_lost(
1088 &self,
1089 connection: ConnectionId,
1090 tx: &DatabaseTransaction,
1091 ) -> Result<()> {
1092 let participant = room_participant::Entity::find()
1093 .filter(
1094 Condition::all()
1095 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1096 .add(
1097 room_participant::Column::AnsweringConnectionServerId
1098 .eq(connection.owner_id as i32),
1099 ),
1100 )
1101 .one(tx)
1102 .await?;
1103
1104 if let Some(participant) = participant {
1105 room_participant::Entity::update(room_participant::ActiveModel {
1106 answering_connection_lost: ActiveValue::set(true),
1107 ..participant.into_active_model()
1108 })
1109 .exec(tx)
1110 .await?;
1111 }
1112 Ok(())
1113 }
1114
1115 fn build_incoming_call(
1116 room: &proto::Room,
1117 called_user_id: UserId,
1118 ) -> Option<proto::IncomingCall> {
1119 let pending_participant = room
1120 .pending_participants
1121 .iter()
1122 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1123
1124 Some(proto::IncomingCall {
1125 room_id: room.id,
1126 calling_user_id: pending_participant.calling_user_id,
1127 participant_user_ids: room
1128 .participants
1129 .iter()
1130 .map(|participant| participant.user_id)
1131 .collect(),
1132 initial_project: room.participants.iter().find_map(|participant| {
1133 let initial_project_id = pending_participant.initial_project_id?;
1134 participant
1135 .projects
1136 .iter()
1137 .find(|project| project.id == initial_project_id)
1138 .cloned()
1139 }),
1140 })
1141 }
1142
1143 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1144 let (_, room) = self.get_channel_room(room_id, tx).await?;
1145 Ok(room)
1146 }
1147
1148 pub async fn room_connection_ids(
1149 &self,
1150 room_id: RoomId,
1151 connection_id: ConnectionId,
1152 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1153 self.room_transaction(room_id, |tx| async move {
1154 let mut participants = room_participant::Entity::find()
1155 .filter(room_participant::Column::RoomId.eq(room_id))
1156 .stream(&*tx)
1157 .await?;
1158
1159 let mut is_participant = false;
1160 let mut connection_ids = HashSet::default();
1161 while let Some(participant) = participants.next().await {
1162 let participant = participant?;
1163 if let Some(answering_connection) = participant.answering_connection() {
1164 if answering_connection == connection_id {
1165 is_participant = true;
1166 } else {
1167 connection_ids.insert(answering_connection);
1168 }
1169 }
1170 }
1171
1172 if !is_participant {
1173 Err(anyhow!("not a room participant"))?;
1174 }
1175
1176 Ok(connection_ids)
1177 })
1178 .await
1179 }
1180
1181 async fn get_channel_room(
1182 &self,
1183 room_id: RoomId,
1184 tx: &DatabaseTransaction,
1185 ) -> Result<(Option<channel::Model>, proto::Room)> {
1186 let db_room = room::Entity::find_by_id(room_id)
1187 .one(tx)
1188 .await?
1189 .ok_or_else(|| anyhow!("could not find room"))?;
1190
1191 let mut db_participants = db_room
1192 .find_related(room_participant::Entity)
1193 .stream(tx)
1194 .await?;
1195 let mut participants = HashMap::default();
1196 let mut pending_participants = Vec::new();
1197 while let Some(db_participant) = db_participants.next().await {
1198 let db_participant = db_participant?;
1199 if let (
1200 Some(answering_connection_id),
1201 Some(answering_connection_server_id),
1202 Some(participant_index),
1203 ) = (
1204 db_participant.answering_connection_id,
1205 db_participant.answering_connection_server_id,
1206 db_participant.participant_index,
1207 ) {
1208 let location = match (
1209 db_participant.location_kind,
1210 db_participant.location_project_id,
1211 ) {
1212 (Some(0), Some(project_id)) => {
1213 Some(proto::participant_location::Variant::SharedProject(
1214 proto::participant_location::SharedProject {
1215 id: project_id.to_proto(),
1216 },
1217 ))
1218 }
1219 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1220 Default::default(),
1221 )),
1222 _ => Some(proto::participant_location::Variant::External(
1223 Default::default(),
1224 )),
1225 };
1226
1227 let answering_connection = ConnectionId {
1228 owner_id: answering_connection_server_id.0 as u32,
1229 id: answering_connection_id as u32,
1230 };
1231 participants.insert(
1232 answering_connection,
1233 proto::Participant {
1234 user_id: db_participant.user_id.to_proto(),
1235 peer_id: Some(answering_connection.into()),
1236 projects: Default::default(),
1237 location: Some(proto::ParticipantLocation { variant: location }),
1238 participant_index: participant_index as u32,
1239 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1240 },
1241 );
1242 } else {
1243 pending_participants.push(proto::PendingParticipant {
1244 user_id: db_participant.user_id.to_proto(),
1245 calling_user_id: db_participant.calling_user_id.to_proto(),
1246 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1247 });
1248 }
1249 }
1250 drop(db_participants);
1251
1252 let mut db_projects = db_room
1253 .find_related(project::Entity)
1254 .find_with_related(worktree::Entity)
1255 .stream(tx)
1256 .await?;
1257
1258 while let Some(row) = db_projects.next().await {
1259 let (db_project, db_worktree) = row?;
1260 let host_connection = db_project.host_connection()?;
1261 if let Some(participant) = participants.get_mut(&host_connection) {
1262 let project = if let Some(project) = participant
1263 .projects
1264 .iter_mut()
1265 .find(|project| project.id == db_project.id.to_proto())
1266 {
1267 project
1268 } else {
1269 participant.projects.push(proto::ParticipantProject {
1270 id: db_project.id.to_proto(),
1271 worktree_root_names: Default::default(),
1272 });
1273 participant.projects.last_mut().unwrap()
1274 };
1275
1276 if let Some(db_worktree) = db_worktree {
1277 if db_worktree.visible {
1278 project.worktree_root_names.push(db_worktree.root_name);
1279 }
1280 }
1281 }
1282 }
1283 drop(db_projects);
1284
1285 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1286 let mut followers = Vec::new();
1287 while let Some(db_follower) = db_followers.next().await {
1288 let db_follower = db_follower?;
1289 followers.push(proto::Follower {
1290 leader_id: Some(db_follower.leader_connection().into()),
1291 follower_id: Some(db_follower.follower_connection().into()),
1292 project_id: db_follower.project_id.to_proto(),
1293 });
1294 }
1295 drop(db_followers);
1296
1297 let channel = if let Some(channel_id) = db_room.channel_id {
1298 Some(self.get_channel_internal(channel_id, tx).await?)
1299 } else {
1300 None
1301 };
1302
1303 Ok((
1304 channel,
1305 proto::Room {
1306 id: db_room.id.to_proto(),
1307 live_kit_room: db_room.live_kit_room,
1308 participants: participants.into_values().collect(),
1309 pending_participants,
1310 followers,
1311 },
1312 ))
1313 }
1314}