1use super::*;
2
3impl Database {
4 /// Clears all room participants in rooms attached to a stale server.
5 pub async fn clear_stale_room_participants(
6 &self,
7 room_id: RoomId,
8 new_server_id: ServerId,
9 ) -> Result<RoomGuard<RefreshedRoom>> {
10 self.room_transaction(room_id, |tx| async move {
11 let stale_participant_filter = Condition::all()
12 .add(room_participant::Column::RoomId.eq(room_id))
13 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
14 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
15
16 let stale_participant_user_ids = room_participant::Entity::find()
17 .filter(stale_participant_filter.clone())
18 .all(&*tx)
19 .await?
20 .into_iter()
21 .map(|participant| participant.user_id)
22 .collect::<Vec<_>>();
23
24 // Delete participants who failed to reconnect and cancel their calls.
25 let mut canceled_calls_to_user_ids = Vec::new();
26 room_participant::Entity::delete_many()
27 .filter(stale_participant_filter)
28 .exec(&*tx)
29 .await?;
30 let called_participants = room_participant::Entity::find()
31 .filter(
32 Condition::all()
33 .add(
34 room_participant::Column::CallingUserId
35 .is_in(stale_participant_user_ids.iter().copied()),
36 )
37 .add(room_participant::Column::AnsweringConnectionId.is_null()),
38 )
39 .all(&*tx)
40 .await?;
41 room_participant::Entity::delete_many()
42 .filter(
43 room_participant::Column::Id
44 .is_in(called_participants.iter().map(|participant| participant.id)),
45 )
46 .exec(&*tx)
47 .await?;
48 canceled_calls_to_user_ids.extend(
49 called_participants
50 .into_iter()
51 .map(|participant| participant.user_id),
52 );
53
54 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
55 let channel_members;
56 if let Some(channel) = &channel {
57 channel_members = self.get_channel_participants(channel, &tx).await?;
58 } else {
59 channel_members = Vec::new();
60
61 // Delete the room if it becomes empty.
62 if room.participants.is_empty() {
63 project::Entity::delete_many()
64 .filter(project::Column::RoomId.eq(room_id))
65 .exec(&*tx)
66 .await?;
67 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
68 }
69 };
70
71 Ok(RefreshedRoom {
72 room,
73 channel_id: channel.map(|channel| channel.id),
74 channel_members,
75 stale_participant_user_ids,
76 canceled_calls_to_user_ids,
77 })
78 })
79 .await
80 }
81
82 /// Returns the incoming calls for user with the given ID.
83 pub async fn incoming_call_for_user(
84 &self,
85 user_id: UserId,
86 ) -> Result<Option<proto::IncomingCall>> {
87 self.transaction(|tx| async move {
88 let pending_participant = room_participant::Entity::find()
89 .filter(
90 room_participant::Column::UserId
91 .eq(user_id)
92 .and(room_participant::Column::AnsweringConnectionId.is_null()),
93 )
94 .one(&*tx)
95 .await?;
96
97 if let Some(pending_participant) = pending_participant {
98 let room = self.get_room(pending_participant.room_id, &tx).await?;
99 Ok(Self::build_incoming_call(&room, user_id))
100 } else {
101 Ok(None)
102 }
103 })
104 .await
105 }
106
107 /// Creates a new room.
108 pub async fn create_room(
109 &self,
110 user_id: UserId,
111 connection: ConnectionId,
112 live_kit_room: &str,
113 ) -> Result<proto::Room> {
114 self.transaction(|tx| async move {
115 let room = room::ActiveModel {
116 live_kit_room: ActiveValue::set(live_kit_room.into()),
117 ..Default::default()
118 }
119 .insert(&*tx)
120 .await?;
121 room_participant::ActiveModel {
122 room_id: ActiveValue::set(room.id),
123 user_id: ActiveValue::set(user_id),
124 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
125 answering_connection_server_id: ActiveValue::set(Some(ServerId(
126 connection.owner_id as i32,
127 ))),
128 answering_connection_lost: ActiveValue::set(false),
129 calling_user_id: ActiveValue::set(user_id),
130 calling_connection_id: ActiveValue::set(connection.id as i32),
131 calling_connection_server_id: ActiveValue::set(Some(ServerId(
132 connection.owner_id as i32,
133 ))),
134 participant_index: ActiveValue::set(Some(0)),
135 role: ActiveValue::set(Some(ChannelRole::Admin)),
136
137 id: ActiveValue::NotSet,
138 location_kind: ActiveValue::NotSet,
139 location_project_id: ActiveValue::NotSet,
140 initial_project_id: ActiveValue::NotSet,
141 }
142 .insert(&*tx)
143 .await?;
144
145 let room = self.get_room(room.id, &tx).await?;
146 Ok(room)
147 })
148 .await
149 }
150
151 pub async fn call(
152 &self,
153 room_id: RoomId,
154 calling_user_id: UserId,
155 calling_connection: ConnectionId,
156 called_user_id: UserId,
157 initial_project_id: Option<ProjectId>,
158 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
159 self.room_transaction(room_id, |tx| async move {
160 let caller = room_participant::Entity::find()
161 .filter(
162 room_participant::Column::UserId
163 .eq(calling_user_id)
164 .and(room_participant::Column::RoomId.eq(room_id)),
165 )
166 .one(&*tx)
167 .await?
168 .ok_or_else(|| anyhow!("user is not in the room"))?;
169
170 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
171 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
172 ChannelRole::Guest => ChannelRole::Guest,
173 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
174 };
175
176 room_participant::ActiveModel {
177 room_id: ActiveValue::set(room_id),
178 user_id: ActiveValue::set(called_user_id),
179 answering_connection_lost: ActiveValue::set(false),
180 participant_index: ActiveValue::NotSet,
181 calling_user_id: ActiveValue::set(calling_user_id),
182 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
183 calling_connection_server_id: ActiveValue::set(Some(ServerId(
184 calling_connection.owner_id as i32,
185 ))),
186 initial_project_id: ActiveValue::set(initial_project_id),
187 role: ActiveValue::set(Some(called_user_role)),
188
189 id: ActiveValue::NotSet,
190 answering_connection_id: ActiveValue::NotSet,
191 answering_connection_server_id: ActiveValue::NotSet,
192 location_kind: ActiveValue::NotSet,
193 location_project_id: ActiveValue::NotSet,
194 }
195 .insert(&*tx)
196 .await?;
197
198 let room = self.get_room(room_id, &tx).await?;
199 let incoming_call = Self::build_incoming_call(&room, called_user_id)
200 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
201 Ok((room, incoming_call))
202 })
203 .await
204 }
205
206 pub async fn call_failed(
207 &self,
208 room_id: RoomId,
209 called_user_id: UserId,
210 ) -> Result<RoomGuard<proto::Room>> {
211 self.room_transaction(room_id, |tx| async move {
212 room_participant::Entity::delete_many()
213 .filter(
214 room_participant::Column::RoomId
215 .eq(room_id)
216 .and(room_participant::Column::UserId.eq(called_user_id)),
217 )
218 .exec(&*tx)
219 .await?;
220 let room = self.get_room(room_id, &tx).await?;
221 Ok(room)
222 })
223 .await
224 }
225
226 pub async fn decline_call(
227 &self,
228 expected_room_id: Option<RoomId>,
229 user_id: UserId,
230 ) -> Result<Option<RoomGuard<proto::Room>>> {
231 self.optional_room_transaction(|tx| async move {
232 let mut filter = Condition::all()
233 .add(room_participant::Column::UserId.eq(user_id))
234 .add(room_participant::Column::AnsweringConnectionId.is_null());
235 if let Some(room_id) = expected_room_id {
236 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
237 }
238 let participant = room_participant::Entity::find()
239 .filter(filter)
240 .one(&*tx)
241 .await?;
242
243 let participant = if let Some(participant) = participant {
244 participant
245 } else if expected_room_id.is_some() {
246 return Err(anyhow!("could not find call to decline"))?;
247 } else {
248 return Ok(None);
249 };
250
251 let room_id = participant.room_id;
252 room_participant::Entity::delete(participant.into_active_model())
253 .exec(&*tx)
254 .await?;
255
256 let room = self.get_room(room_id, &tx).await?;
257 Ok(Some((room_id, room)))
258 })
259 .await
260 }
261
262 pub async fn cancel_call(
263 &self,
264 room_id: RoomId,
265 calling_connection: ConnectionId,
266 called_user_id: UserId,
267 ) -> Result<RoomGuard<proto::Room>> {
268 self.room_transaction(room_id, |tx| async move {
269 let participant = room_participant::Entity::find()
270 .filter(
271 Condition::all()
272 .add(room_participant::Column::UserId.eq(called_user_id))
273 .add(room_participant::Column::RoomId.eq(room_id))
274 .add(
275 room_participant::Column::CallingConnectionId
276 .eq(calling_connection.id as i32),
277 )
278 .add(
279 room_participant::Column::CallingConnectionServerId
280 .eq(calling_connection.owner_id as i32),
281 )
282 .add(room_participant::Column::AnsweringConnectionId.is_null()),
283 )
284 .one(&*tx)
285 .await?
286 .ok_or_else(|| anyhow!("no call to cancel"))?;
287
288 room_participant::Entity::delete(participant.into_active_model())
289 .exec(&*tx)
290 .await?;
291
292 let room = self.get_room(room_id, &tx).await?;
293 Ok(room)
294 })
295 .await
296 }
297
298 pub async fn join_room(
299 &self,
300 room_id: RoomId,
301 user_id: UserId,
302 connection: ConnectionId,
303 ) -> Result<RoomGuard<JoinRoom>> {
304 self.room_transaction(room_id, |tx| async move {
305 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
306 enum QueryChannelId {
307 ChannelId,
308 }
309
310 let channel_id: Option<ChannelId> = room::Entity::find()
311 .select_only()
312 .column(room::Column::ChannelId)
313 .filter(room::Column::Id.eq(room_id))
314 .into_values::<_, QueryChannelId>()
315 .one(&*tx)
316 .await?
317 .ok_or_else(|| anyhow!("no such room"))?;
318
319 if channel_id.is_some() {
320 Err(anyhow!("tried to join channel call directly"))?
321 }
322
323 let participant_index = self
324 .get_next_participant_index_internal(room_id, &*tx)
325 .await?;
326
327 let result = room_participant::Entity::update_many()
328 .filter(
329 Condition::all()
330 .add(room_participant::Column::RoomId.eq(room_id))
331 .add(room_participant::Column::UserId.eq(user_id))
332 .add(room_participant::Column::AnsweringConnectionId.is_null()),
333 )
334 .set(room_participant::ActiveModel {
335 participant_index: ActiveValue::Set(Some(participant_index)),
336 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
337 answering_connection_server_id: ActiveValue::set(Some(ServerId(
338 connection.owner_id as i32,
339 ))),
340 answering_connection_lost: ActiveValue::set(false),
341 ..Default::default()
342 })
343 .exec(&*tx)
344 .await?;
345 if result.rows_affected == 0 {
346 Err(anyhow!("room does not exist or was already joined"))?;
347 }
348
349 let room = self.get_room(room_id, &tx).await?;
350 Ok(JoinRoom {
351 room,
352 channel_id: None,
353 channel_members: vec![],
354 })
355 })
356 .await
357 }
358
359 async fn get_next_participant_index_internal(
360 &self,
361 room_id: RoomId,
362 tx: &DatabaseTransaction,
363 ) -> Result<i32> {
364 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
365 enum QueryParticipantIndices {
366 ParticipantIndex,
367 }
368 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
369 .filter(
370 room_participant::Column::RoomId
371 .eq(room_id)
372 .and(room_participant::Column::ParticipantIndex.is_not_null()),
373 )
374 .select_only()
375 .column(room_participant::Column::ParticipantIndex)
376 .into_values::<_, QueryParticipantIndices>()
377 .all(&*tx)
378 .await?;
379
380 let mut participant_index = 0;
381 while existing_participant_indices.contains(&participant_index) {
382 participant_index += 1;
383 }
384
385 Ok(participant_index)
386 }
387
388 /// Returns the channel ID for the given room, if it has one.
389 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
390 self.transaction(|tx| async move {
391 let room: Option<room::Model> = room::Entity::find()
392 .filter(room::Column::Id.eq(room_id))
393 .one(&*tx)
394 .await?;
395
396 Ok(room.and_then(|room| room.channel_id))
397 })
398 .await
399 }
400
401 pub(crate) async fn join_channel_room_internal(
402 &self,
403 room_id: RoomId,
404 user_id: UserId,
405 connection: ConnectionId,
406 role: ChannelRole,
407 tx: &DatabaseTransaction,
408 ) -> Result<JoinRoom> {
409 let participant_index = self
410 .get_next_participant_index_internal(room_id, &*tx)
411 .await?;
412
413 room_participant::Entity::insert_many([room_participant::ActiveModel {
414 room_id: ActiveValue::set(room_id),
415 user_id: ActiveValue::set(user_id),
416 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
417 answering_connection_server_id: ActiveValue::set(Some(ServerId(
418 connection.owner_id as i32,
419 ))),
420 answering_connection_lost: ActiveValue::set(false),
421 calling_user_id: ActiveValue::set(user_id),
422 calling_connection_id: ActiveValue::set(connection.id as i32),
423 calling_connection_server_id: ActiveValue::set(Some(ServerId(
424 connection.owner_id as i32,
425 ))),
426 participant_index: ActiveValue::Set(Some(participant_index)),
427 role: ActiveValue::set(Some(role)),
428 id: ActiveValue::NotSet,
429 location_kind: ActiveValue::NotSet,
430 location_project_id: ActiveValue::NotSet,
431 initial_project_id: ActiveValue::NotSet,
432 }])
433 .on_conflict(
434 OnConflict::columns([room_participant::Column::UserId])
435 .update_columns([
436 room_participant::Column::AnsweringConnectionId,
437 room_participant::Column::AnsweringConnectionServerId,
438 room_participant::Column::AnsweringConnectionLost,
439 room_participant::Column::ParticipantIndex,
440 room_participant::Column::Role,
441 ])
442 .to_owned(),
443 )
444 .exec(&*tx)
445 .await?;
446
447 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
448 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
449 let channel_members = self.get_channel_participants(&channel, &*tx).await?;
450 Ok(JoinRoom {
451 room,
452 channel_id: Some(channel.id),
453 channel_members,
454 })
455 }
456
457 pub async fn rejoin_room(
458 &self,
459 rejoin_room: proto::RejoinRoom,
460 user_id: UserId,
461 connection: ConnectionId,
462 ) -> Result<RoomGuard<RejoinedRoom>> {
463 let room_id = RoomId::from_proto(rejoin_room.id);
464 self.room_transaction(room_id, |tx| async {
465 let tx = tx;
466 let participant_update = room_participant::Entity::update_many()
467 .filter(
468 Condition::all()
469 .add(room_participant::Column::RoomId.eq(room_id))
470 .add(room_participant::Column::UserId.eq(user_id))
471 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
472 .add(
473 Condition::any()
474 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
475 .add(
476 room_participant::Column::AnsweringConnectionServerId
477 .ne(connection.owner_id as i32),
478 ),
479 ),
480 )
481 .set(room_participant::ActiveModel {
482 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
483 answering_connection_server_id: ActiveValue::set(Some(ServerId(
484 connection.owner_id as i32,
485 ))),
486 answering_connection_lost: ActiveValue::set(false),
487 ..Default::default()
488 })
489 .exec(&*tx)
490 .await?;
491 if participant_update.rows_affected == 0 {
492 return Err(anyhow!("room does not exist or was already joined"))?;
493 }
494
495 let mut reshared_projects = Vec::new();
496 for reshared_project in &rejoin_room.reshared_projects {
497 let project_id = ProjectId::from_proto(reshared_project.project_id);
498 let project = project::Entity::find_by_id(project_id)
499 .one(&*tx)
500 .await?
501 .ok_or_else(|| anyhow!("project does not exist"))?;
502 if project.host_user_id != user_id {
503 return Err(anyhow!("no such project"))?;
504 }
505
506 let mut collaborators = project
507 .find_related(project_collaborator::Entity)
508 .all(&*tx)
509 .await?;
510 let host_ix = collaborators
511 .iter()
512 .position(|collaborator| {
513 collaborator.user_id == user_id && collaborator.is_host
514 })
515 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
516 let host = collaborators.swap_remove(host_ix);
517 let old_connection_id = host.connection();
518
519 project::Entity::update(project::ActiveModel {
520 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
521 host_connection_server_id: ActiveValue::set(Some(ServerId(
522 connection.owner_id as i32,
523 ))),
524 ..project.into_active_model()
525 })
526 .exec(&*tx)
527 .await?;
528 project_collaborator::Entity::update(project_collaborator::ActiveModel {
529 connection_id: ActiveValue::set(connection.id as i32),
530 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
531 ..host.into_active_model()
532 })
533 .exec(&*tx)
534 .await?;
535
536 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
537 .await?;
538
539 reshared_projects.push(ResharedProject {
540 id: project_id,
541 old_connection_id,
542 collaborators: collaborators
543 .iter()
544 .map(|collaborator| ProjectCollaborator {
545 connection_id: collaborator.connection(),
546 user_id: collaborator.user_id,
547 replica_id: collaborator.replica_id,
548 is_host: collaborator.is_host,
549 })
550 .collect(),
551 worktrees: reshared_project.worktrees.clone(),
552 });
553 }
554
555 project::Entity::delete_many()
556 .filter(
557 Condition::all()
558 .add(project::Column::RoomId.eq(room_id))
559 .add(project::Column::HostUserId.eq(user_id))
560 .add(
561 project::Column::Id
562 .is_not_in(reshared_projects.iter().map(|project| project.id)),
563 ),
564 )
565 .exec(&*tx)
566 .await?;
567
568 let mut rejoined_projects = Vec::new();
569 for rejoined_project in &rejoin_room.rejoined_projects {
570 let project_id = ProjectId::from_proto(rejoined_project.id);
571 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
572 continue;
573 };
574
575 let mut worktrees = Vec::new();
576 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
577 for db_worktree in db_worktrees {
578 let mut worktree = RejoinedWorktree {
579 id: db_worktree.id as u64,
580 abs_path: db_worktree.abs_path,
581 root_name: db_worktree.root_name,
582 visible: db_worktree.visible,
583 updated_entries: Default::default(),
584 removed_entries: Default::default(),
585 updated_repositories: Default::default(),
586 removed_repositories: Default::default(),
587 diagnostic_summaries: Default::default(),
588 settings_files: Default::default(),
589 scan_id: db_worktree.scan_id as u64,
590 completed_scan_id: db_worktree.completed_scan_id as u64,
591 };
592
593 let rejoined_worktree = rejoined_project
594 .worktrees
595 .iter()
596 .find(|worktree| worktree.id == db_worktree.id as u64);
597
598 // File entries
599 {
600 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
601 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
602 } else {
603 worktree_entry::Column::IsDeleted.eq(false)
604 };
605
606 let mut db_entries = worktree_entry::Entity::find()
607 .filter(
608 Condition::all()
609 .add(worktree_entry::Column::ProjectId.eq(project.id))
610 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
611 .add(entry_filter),
612 )
613 .stream(&*tx)
614 .await?;
615
616 while let Some(db_entry) = db_entries.next().await {
617 let db_entry = db_entry?;
618 if db_entry.is_deleted {
619 worktree.removed_entries.push(db_entry.id as u64);
620 } else {
621 worktree.updated_entries.push(proto::Entry {
622 id: db_entry.id as u64,
623 is_dir: db_entry.is_dir,
624 path: db_entry.path,
625 inode: db_entry.inode as u64,
626 mtime: Some(proto::Timestamp {
627 seconds: db_entry.mtime_seconds as u64,
628 nanos: db_entry.mtime_nanos as u32,
629 }),
630 is_symlink: db_entry.is_symlink,
631 is_ignored: db_entry.is_ignored,
632 is_external: db_entry.is_external,
633 git_status: db_entry.git_status.map(|status| status as i32),
634 });
635 }
636 }
637 }
638
639 // Repository Entries
640 {
641 let repository_entry_filter =
642 if let Some(rejoined_worktree) = rejoined_worktree {
643 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
644 } else {
645 worktree_repository::Column::IsDeleted.eq(false)
646 };
647
648 let mut db_repositories = worktree_repository::Entity::find()
649 .filter(
650 Condition::all()
651 .add(worktree_repository::Column::ProjectId.eq(project.id))
652 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
653 .add(repository_entry_filter),
654 )
655 .stream(&*tx)
656 .await?;
657
658 while let Some(db_repository) = db_repositories.next().await {
659 let db_repository = db_repository?;
660 if db_repository.is_deleted {
661 worktree
662 .removed_repositories
663 .push(db_repository.work_directory_id as u64);
664 } else {
665 worktree.updated_repositories.push(proto::RepositoryEntry {
666 work_directory_id: db_repository.work_directory_id as u64,
667 branch: db_repository.branch,
668 });
669 }
670 }
671 }
672
673 worktrees.push(worktree);
674 }
675
676 let language_servers = project
677 .find_related(language_server::Entity)
678 .all(&*tx)
679 .await?
680 .into_iter()
681 .map(|language_server| proto::LanguageServer {
682 id: language_server.id as u64,
683 name: language_server.name,
684 })
685 .collect::<Vec<_>>();
686
687 {
688 let mut db_settings_files = worktree_settings_file::Entity::find()
689 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
690 .stream(&*tx)
691 .await?;
692 while let Some(db_settings_file) = db_settings_files.next().await {
693 let db_settings_file = db_settings_file?;
694 if let Some(worktree) = worktrees
695 .iter_mut()
696 .find(|w| w.id == db_settings_file.worktree_id as u64)
697 {
698 worktree.settings_files.push(WorktreeSettingsFile {
699 path: db_settings_file.path,
700 content: db_settings_file.content,
701 });
702 }
703 }
704 }
705
706 let mut collaborators = project
707 .find_related(project_collaborator::Entity)
708 .all(&*tx)
709 .await?;
710 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
711 .iter()
712 .position(|collaborator| collaborator.user_id == user_id)
713 {
714 collaborators.swap_remove(self_collaborator_ix)
715 } else {
716 continue;
717 };
718 let old_connection_id = self_collaborator.connection();
719 project_collaborator::Entity::update(project_collaborator::ActiveModel {
720 connection_id: ActiveValue::set(connection.id as i32),
721 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
722 ..self_collaborator.into_active_model()
723 })
724 .exec(&*tx)
725 .await?;
726
727 let collaborators = collaborators
728 .into_iter()
729 .map(|collaborator| ProjectCollaborator {
730 connection_id: collaborator.connection(),
731 user_id: collaborator.user_id,
732 replica_id: collaborator.replica_id,
733 is_host: collaborator.is_host,
734 })
735 .collect::<Vec<_>>();
736
737 rejoined_projects.push(RejoinedProject {
738 id: project_id,
739 old_connection_id,
740 collaborators,
741 worktrees,
742 language_servers,
743 });
744 }
745
746 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
747 let channel_members = if let Some(channel) = &channel {
748 self.get_channel_participants(&channel, &tx).await?
749 } else {
750 Vec::new()
751 };
752
753 Ok(RejoinedRoom {
754 room,
755 channel_id: channel.map(|channel| channel.id),
756 channel_members,
757 rejoined_projects,
758 reshared_projects,
759 })
760 })
761 .await
762 }
763
764 pub async fn leave_room(
765 &self,
766 connection: ConnectionId,
767 ) -> Result<Option<RoomGuard<LeftRoom>>> {
768 self.optional_room_transaction(|tx| async move {
769 let leaving_participant = room_participant::Entity::find()
770 .filter(
771 Condition::all()
772 .add(
773 room_participant::Column::AnsweringConnectionId
774 .eq(connection.id as i32),
775 )
776 .add(
777 room_participant::Column::AnsweringConnectionServerId
778 .eq(connection.owner_id as i32),
779 ),
780 )
781 .one(&*tx)
782 .await?;
783
784 if let Some(leaving_participant) = leaving_participant {
785 // Leave room.
786 let room_id = leaving_participant.room_id;
787 room_participant::Entity::delete_by_id(leaving_participant.id)
788 .exec(&*tx)
789 .await?;
790
791 // Cancel pending calls initiated by the leaving user.
792 let called_participants = room_participant::Entity::find()
793 .filter(
794 Condition::all()
795 .add(
796 room_participant::Column::CallingUserId
797 .eq(leaving_participant.user_id),
798 )
799 .add(room_participant::Column::AnsweringConnectionId.is_null()),
800 )
801 .all(&*tx)
802 .await?;
803 room_participant::Entity::delete_many()
804 .filter(
805 room_participant::Column::Id
806 .is_in(called_participants.iter().map(|participant| participant.id)),
807 )
808 .exec(&*tx)
809 .await?;
810 let canceled_calls_to_user_ids = called_participants
811 .into_iter()
812 .map(|participant| participant.user_id)
813 .collect();
814
815 // Detect left projects.
816 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
817 enum QueryProjectIds {
818 ProjectId,
819 }
820 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
821 .select_only()
822 .column_as(
823 project_collaborator::Column::ProjectId,
824 QueryProjectIds::ProjectId,
825 )
826 .filter(
827 Condition::all()
828 .add(
829 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
830 )
831 .add(
832 project_collaborator::Column::ConnectionServerId
833 .eq(connection.owner_id as i32),
834 ),
835 )
836 .into_values::<_, QueryProjectIds>()
837 .all(&*tx)
838 .await?;
839 let mut left_projects = HashMap::default();
840 let mut collaborators = project_collaborator::Entity::find()
841 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
842 .stream(&*tx)
843 .await?;
844 while let Some(collaborator) = collaborators.next().await {
845 let collaborator = collaborator?;
846 let left_project =
847 left_projects
848 .entry(collaborator.project_id)
849 .or_insert(LeftProject {
850 id: collaborator.project_id,
851 host_user_id: Default::default(),
852 connection_ids: Default::default(),
853 host_connection_id: None,
854 });
855
856 let collaborator_connection_id = collaborator.connection();
857 if collaborator_connection_id != connection {
858 left_project.connection_ids.push(collaborator_connection_id);
859 }
860
861 if collaborator.is_host {
862 left_project.host_user_id = collaborator.user_id;
863 left_project.host_connection_id = Some(collaborator_connection_id);
864 }
865 }
866 drop(collaborators);
867
868 // Leave projects.
869 project_collaborator::Entity::delete_many()
870 .filter(
871 Condition::all()
872 .add(
873 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
874 )
875 .add(
876 project_collaborator::Column::ConnectionServerId
877 .eq(connection.owner_id as i32),
878 ),
879 )
880 .exec(&*tx)
881 .await?;
882
883 follower::Entity::delete_many()
884 .filter(
885 Condition::all()
886 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
887 )
888 .exec(&*tx)
889 .await?;
890
891 // Unshare projects.
892 project::Entity::delete_many()
893 .filter(
894 Condition::all()
895 .add(project::Column::RoomId.eq(room_id))
896 .add(project::Column::HostConnectionId.eq(connection.id as i32))
897 .add(
898 project::Column::HostConnectionServerId
899 .eq(connection.owner_id as i32),
900 ),
901 )
902 .exec(&*tx)
903 .await?;
904
905 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
906 let deleted = if room.participants.is_empty() {
907 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
908 result.rows_affected > 0
909 } else {
910 false
911 };
912
913 let channel_members = if let Some(channel) = &channel {
914 self.get_channel_participants(channel, &tx).await?
915 } else {
916 Vec::new()
917 };
918 let left_room = LeftRoom {
919 room,
920 channel_id: channel.map(|channel| channel.id),
921 channel_members,
922 left_projects,
923 canceled_calls_to_user_ids,
924 deleted,
925 };
926
927 if left_room.room.participants.is_empty() {
928 self.rooms.remove(&room_id);
929 }
930
931 Ok(Some((room_id, left_room)))
932 } else {
933 Ok(None)
934 }
935 })
936 .await
937 }
938
939 /// Updates the location of a participant in the given room.
940 pub async fn update_room_participant_location(
941 &self,
942 room_id: RoomId,
943 connection: ConnectionId,
944 location: proto::ParticipantLocation,
945 ) -> Result<RoomGuard<proto::Room>> {
946 self.room_transaction(room_id, |tx| async {
947 let tx = tx;
948 let location_kind;
949 let location_project_id;
950 match location
951 .variant
952 .as_ref()
953 .ok_or_else(|| anyhow!("invalid location"))?
954 {
955 proto::participant_location::Variant::SharedProject(project) => {
956 location_kind = 0;
957 location_project_id = Some(ProjectId::from_proto(project.id));
958 }
959 proto::participant_location::Variant::UnsharedProject(_) => {
960 location_kind = 1;
961 location_project_id = None;
962 }
963 proto::participant_location::Variant::External(_) => {
964 location_kind = 2;
965 location_project_id = None;
966 }
967 }
968
969 let result = room_participant::Entity::update_many()
970 .filter(
971 Condition::all()
972 .add(room_participant::Column::RoomId.eq(room_id))
973 .add(
974 room_participant::Column::AnsweringConnectionId
975 .eq(connection.id as i32),
976 )
977 .add(
978 room_participant::Column::AnsweringConnectionServerId
979 .eq(connection.owner_id as i32),
980 ),
981 )
982 .set(room_participant::ActiveModel {
983 location_kind: ActiveValue::set(Some(location_kind)),
984 location_project_id: ActiveValue::set(location_project_id),
985 ..Default::default()
986 })
987 .exec(&*tx)
988 .await?;
989
990 if result.rows_affected == 1 {
991 let room = self.get_room(room_id, &tx).await?;
992 Ok(room)
993 } else {
994 Err(anyhow!("could not update room participant location"))?
995 }
996 })
997 .await
998 }
999
1000 /// Sets the role of a participant in the given room.
1001 pub async fn set_room_participant_role(
1002 &self,
1003 admin_id: UserId,
1004 room_id: RoomId,
1005 user_id: UserId,
1006 role: ChannelRole,
1007 ) -> Result<RoomGuard<proto::Room>> {
1008 self.room_transaction(room_id, |tx| async move {
1009 room_participant::Entity::find()
1010 .filter(
1011 Condition::all()
1012 .add(room_participant::Column::RoomId.eq(room_id))
1013 .add(room_participant::Column::UserId.eq(admin_id))
1014 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1015 )
1016 .one(&*tx)
1017 .await?
1018 .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1019
1020 if role.requires_cla() {
1021 self.check_user_has_signed_cla(user_id, room_id, &*tx)
1022 .await?;
1023 }
1024
1025 let result = room_participant::Entity::update_many()
1026 .filter(
1027 Condition::all()
1028 .add(room_participant::Column::RoomId.eq(room_id))
1029 .add(room_participant::Column::UserId.eq(user_id)),
1030 )
1031 .set(room_participant::ActiveModel {
1032 role: ActiveValue::set(Some(ChannelRole::from(role))),
1033 ..Default::default()
1034 })
1035 .exec(&*tx)
1036 .await?;
1037
1038 if result.rows_affected != 1 {
1039 Err(anyhow!("could not update room participant role"))?;
1040 }
1041 Ok(self.get_room(room_id, &tx).await?)
1042 })
1043 .await
1044 }
1045
1046 async fn check_user_has_signed_cla(
1047 &self,
1048 user_id: UserId,
1049 room_id: RoomId,
1050 tx: &DatabaseTransaction,
1051 ) -> Result<()> {
1052 let channel = room::Entity::find_by_id(room_id)
1053 .one(&*tx)
1054 .await?
1055 .ok_or_else(|| anyhow!("could not find room"))?
1056 .find_related(channel::Entity)
1057 .one(&*tx)
1058 .await?;
1059
1060 if let Some(channel) = channel {
1061 let requires_zed_cla = channel.requires_zed_cla
1062 || channel::Entity::find()
1063 .filter(
1064 channel::Column::Id
1065 .is_in(channel.ancestors())
1066 .and(channel::Column::RequiresZedCla.eq(true)),
1067 )
1068 .count(&*tx)
1069 .await?
1070 > 0;
1071 if requires_zed_cla {
1072 if contributor::Entity::find()
1073 .filter(contributor::Column::UserId.eq(user_id))
1074 .one(&*tx)
1075 .await?
1076 .is_none()
1077 {
1078 Err(anyhow!("user has not signed the Zed CLA"))?;
1079 }
1080 }
1081 }
1082 Ok(())
1083 }
1084
1085 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1086 self.transaction(|tx| async move {
1087 self.room_connection_lost(connection, &*tx).await?;
1088 self.channel_buffer_connection_lost(connection, &*tx)
1089 .await?;
1090 self.channel_chat_connection_lost(connection, &*tx).await?;
1091 Ok(())
1092 })
1093 .await
1094 }
1095
1096 pub async fn room_connection_lost(
1097 &self,
1098 connection: ConnectionId,
1099 tx: &DatabaseTransaction,
1100 ) -> Result<()> {
1101 let participant = room_participant::Entity::find()
1102 .filter(
1103 Condition::all()
1104 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1105 .add(
1106 room_participant::Column::AnsweringConnectionServerId
1107 .eq(connection.owner_id as i32),
1108 ),
1109 )
1110 .one(&*tx)
1111 .await?;
1112
1113 if let Some(participant) = participant {
1114 room_participant::Entity::update(room_participant::ActiveModel {
1115 answering_connection_lost: ActiveValue::set(true),
1116 ..participant.into_active_model()
1117 })
1118 .exec(&*tx)
1119 .await?;
1120 }
1121 Ok(())
1122 }
1123
1124 fn build_incoming_call(
1125 room: &proto::Room,
1126 called_user_id: UserId,
1127 ) -> Option<proto::IncomingCall> {
1128 let pending_participant = room
1129 .pending_participants
1130 .iter()
1131 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1132
1133 Some(proto::IncomingCall {
1134 room_id: room.id,
1135 calling_user_id: pending_participant.calling_user_id,
1136 participant_user_ids: room
1137 .participants
1138 .iter()
1139 .map(|participant| participant.user_id)
1140 .collect(),
1141 initial_project: room.participants.iter().find_map(|participant| {
1142 let initial_project_id = pending_participant.initial_project_id?;
1143 participant
1144 .projects
1145 .iter()
1146 .find(|project| project.id == initial_project_id)
1147 .cloned()
1148 }),
1149 })
1150 }
1151
1152 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1153 let (_, room) = self.get_channel_room(room_id, tx).await?;
1154 Ok(room)
1155 }
1156
1157 pub async fn room_connection_ids(
1158 &self,
1159 room_id: RoomId,
1160 connection_id: ConnectionId,
1161 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1162 self.room_transaction(room_id, |tx| async move {
1163 let mut participants = room_participant::Entity::find()
1164 .filter(room_participant::Column::RoomId.eq(room_id))
1165 .stream(&*tx)
1166 .await?;
1167
1168 let mut is_participant = false;
1169 let mut connection_ids = HashSet::default();
1170 while let Some(participant) = participants.next().await {
1171 let participant = participant?;
1172 if let Some(answering_connection) = participant.answering_connection() {
1173 if answering_connection == connection_id {
1174 is_participant = true;
1175 } else {
1176 connection_ids.insert(answering_connection);
1177 }
1178 }
1179 }
1180
1181 if !is_participant {
1182 Err(anyhow!("not a room participant"))?;
1183 }
1184
1185 Ok(connection_ids)
1186 })
1187 .await
1188 }
1189
1190 async fn get_channel_room(
1191 &self,
1192 room_id: RoomId,
1193 tx: &DatabaseTransaction,
1194 ) -> Result<(Option<channel::Model>, proto::Room)> {
1195 let db_room = room::Entity::find_by_id(room_id)
1196 .one(tx)
1197 .await?
1198 .ok_or_else(|| anyhow!("could not find room"))?;
1199
1200 let mut db_participants = db_room
1201 .find_related(room_participant::Entity)
1202 .stream(tx)
1203 .await?;
1204 let mut participants = HashMap::default();
1205 let mut pending_participants = Vec::new();
1206 while let Some(db_participant) = db_participants.next().await {
1207 let db_participant = db_participant?;
1208 if let (
1209 Some(answering_connection_id),
1210 Some(answering_connection_server_id),
1211 Some(participant_index),
1212 ) = (
1213 db_participant.answering_connection_id,
1214 db_participant.answering_connection_server_id,
1215 db_participant.participant_index,
1216 ) {
1217 let location = match (
1218 db_participant.location_kind,
1219 db_participant.location_project_id,
1220 ) {
1221 (Some(0), Some(project_id)) => {
1222 Some(proto::participant_location::Variant::SharedProject(
1223 proto::participant_location::SharedProject {
1224 id: project_id.to_proto(),
1225 },
1226 ))
1227 }
1228 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1229 Default::default(),
1230 )),
1231 _ => Some(proto::participant_location::Variant::External(
1232 Default::default(),
1233 )),
1234 };
1235
1236 let answering_connection = ConnectionId {
1237 owner_id: answering_connection_server_id.0 as u32,
1238 id: answering_connection_id as u32,
1239 };
1240 participants.insert(
1241 answering_connection,
1242 proto::Participant {
1243 user_id: db_participant.user_id.to_proto(),
1244 peer_id: Some(answering_connection.into()),
1245 projects: Default::default(),
1246 location: Some(proto::ParticipantLocation { variant: location }),
1247 participant_index: participant_index as u32,
1248 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1249 },
1250 );
1251 } else {
1252 pending_participants.push(proto::PendingParticipant {
1253 user_id: db_participant.user_id.to_proto(),
1254 calling_user_id: db_participant.calling_user_id.to_proto(),
1255 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1256 });
1257 }
1258 }
1259 drop(db_participants);
1260
1261 let mut db_projects = db_room
1262 .find_related(project::Entity)
1263 .find_with_related(worktree::Entity)
1264 .stream(tx)
1265 .await?;
1266
1267 while let Some(row) = db_projects.next().await {
1268 let (db_project, db_worktree) = row?;
1269 let host_connection = db_project.host_connection()?;
1270 if let Some(participant) = participants.get_mut(&host_connection) {
1271 let project = if let Some(project) = participant
1272 .projects
1273 .iter_mut()
1274 .find(|project| project.id == db_project.id.to_proto())
1275 {
1276 project
1277 } else {
1278 participant.projects.push(proto::ParticipantProject {
1279 id: db_project.id.to_proto(),
1280 worktree_root_names: Default::default(),
1281 });
1282 participant.projects.last_mut().unwrap()
1283 };
1284
1285 if let Some(db_worktree) = db_worktree {
1286 if db_worktree.visible {
1287 project.worktree_root_names.push(db_worktree.root_name);
1288 }
1289 }
1290 }
1291 }
1292 drop(db_projects);
1293
1294 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1295 let mut followers = Vec::new();
1296 while let Some(db_follower) = db_followers.next().await {
1297 let db_follower = db_follower?;
1298 followers.push(proto::Follower {
1299 leader_id: Some(db_follower.leader_connection().into()),
1300 follower_id: Some(db_follower.follower_connection().into()),
1301 project_id: db_follower.project_id.to_proto(),
1302 });
1303 }
1304 drop(db_followers);
1305
1306 let channel = if let Some(channel_id) = db_room.channel_id {
1307 Some(self.get_channel_internal(channel_id, &*tx).await?)
1308 } else {
1309 None
1310 };
1311
1312 Ok((
1313 channel,
1314 proto::Room {
1315 id: db_room.id.to_proto(),
1316 live_kit_room: db_room.live_kit_room,
1317 participants: participants.into_values().collect(),
1318 pending_participants,
1319 followers,
1320 },
1321 ))
1322 }
1323}