1use super::*;
2
3impl Database {
4 /// Clears all room participants in rooms attached to a stale server.
5 pub async fn clear_stale_room_participants(
6 &self,
7 room_id: RoomId,
8 new_server_id: ServerId,
9 ) -> Result<TransactionGuard<RefreshedRoom>> {
10 self.room_transaction(room_id, |tx| async move {
11 let stale_participant_filter = Condition::all()
12 .add(room_participant::Column::RoomId.eq(room_id))
13 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
14 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
15
16 let stale_participant_user_ids = room_participant::Entity::find()
17 .filter(stale_participant_filter.clone())
18 .all(&*tx)
19 .await?
20 .into_iter()
21 .map(|participant| participant.user_id)
22 .collect::<Vec<_>>();
23
24 // Delete participants who failed to reconnect and cancel their calls.
25 let mut canceled_calls_to_user_ids = Vec::new();
26 room_participant::Entity::delete_many()
27 .filter(stale_participant_filter)
28 .exec(&*tx)
29 .await?;
30 let called_participants = room_participant::Entity::find()
31 .filter(
32 Condition::all()
33 .add(
34 room_participant::Column::CallingUserId
35 .is_in(stale_participant_user_ids.iter().copied()),
36 )
37 .add(room_participant::Column::AnsweringConnectionId.is_null()),
38 )
39 .all(&*tx)
40 .await?;
41 room_participant::Entity::delete_many()
42 .filter(
43 room_participant::Column::Id
44 .is_in(called_participants.iter().map(|participant| participant.id)),
45 )
46 .exec(&*tx)
47 .await?;
48 canceled_calls_to_user_ids.extend(
49 called_participants
50 .into_iter()
51 .map(|participant| participant.user_id),
52 );
53
54 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
55 if channel.is_none() {
56 // Delete the room if it becomes empty.
57 if room.participants.is_empty() {
58 project::Entity::delete_many()
59 .filter(project::Column::RoomId.eq(room_id))
60 .exec(&*tx)
61 .await?;
62 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
63 }
64 };
65
66 Ok(RefreshedRoom {
67 room,
68 channel,
69 stale_participant_user_ids,
70 canceled_calls_to_user_ids,
71 })
72 })
73 .await
74 }
75
76 /// Returns the incoming calls for user with the given ID.
77 pub async fn incoming_call_for_user(
78 &self,
79 user_id: UserId,
80 ) -> Result<Option<proto::IncomingCall>> {
81 self.transaction(|tx| async move {
82 let pending_participant = room_participant::Entity::find()
83 .filter(
84 room_participant::Column::UserId
85 .eq(user_id)
86 .and(room_participant::Column::AnsweringConnectionId.is_null()),
87 )
88 .one(&*tx)
89 .await?;
90
91 if let Some(pending_participant) = pending_participant {
92 let room = self.get_room(pending_participant.room_id, &tx).await?;
93 Ok(Self::build_incoming_call(&room, user_id))
94 } else {
95 Ok(None)
96 }
97 })
98 .await
99 }
100
101 /// Creates a new room.
102 pub async fn create_room(
103 &self,
104 user_id: UserId,
105 connection: ConnectionId,
106 live_kit_room: &str,
107 ) -> Result<proto::Room> {
108 self.transaction(|tx| async move {
109 let room = room::ActiveModel {
110 live_kit_room: ActiveValue::set(live_kit_room.into()),
111 ..Default::default()
112 }
113 .insert(&*tx)
114 .await?;
115 room_participant::ActiveModel {
116 room_id: ActiveValue::set(room.id),
117 user_id: ActiveValue::set(user_id),
118 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
119 answering_connection_server_id: ActiveValue::set(Some(ServerId(
120 connection.owner_id as i32,
121 ))),
122 answering_connection_lost: ActiveValue::set(false),
123 calling_user_id: ActiveValue::set(user_id),
124 calling_connection_id: ActiveValue::set(connection.id as i32),
125 calling_connection_server_id: ActiveValue::set(Some(ServerId(
126 connection.owner_id as i32,
127 ))),
128 participant_index: ActiveValue::set(Some(0)),
129 role: ActiveValue::set(Some(ChannelRole::Admin)),
130
131 id: ActiveValue::NotSet,
132 location_kind: ActiveValue::NotSet,
133 location_project_id: ActiveValue::NotSet,
134 initial_project_id: ActiveValue::NotSet,
135 }
136 .insert(&*tx)
137 .await?;
138
139 let room = self.get_room(room.id, &tx).await?;
140 Ok(room)
141 })
142 .await
143 }
144
145 pub async fn call(
146 &self,
147 room_id: RoomId,
148 calling_user_id: UserId,
149 calling_connection: ConnectionId,
150 called_user_id: UserId,
151 initial_project_id: Option<ProjectId>,
152 ) -> Result<TransactionGuard<(proto::Room, proto::IncomingCall)>> {
153 self.room_transaction(room_id, |tx| async move {
154 let caller = room_participant::Entity::find()
155 .filter(
156 room_participant::Column::UserId
157 .eq(calling_user_id)
158 .and(room_participant::Column::RoomId.eq(room_id)),
159 )
160 .one(&*tx)
161 .await?
162 .ok_or_else(|| anyhow!("user is not in the room"))?;
163
164 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
165 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
166 ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
167 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
168 };
169
170 room_participant::ActiveModel {
171 room_id: ActiveValue::set(room_id),
172 user_id: ActiveValue::set(called_user_id),
173 answering_connection_lost: ActiveValue::set(false),
174 participant_index: ActiveValue::NotSet,
175 calling_user_id: ActiveValue::set(calling_user_id),
176 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
177 calling_connection_server_id: ActiveValue::set(Some(ServerId(
178 calling_connection.owner_id as i32,
179 ))),
180 initial_project_id: ActiveValue::set(initial_project_id),
181 role: ActiveValue::set(Some(called_user_role)),
182
183 id: ActiveValue::NotSet,
184 answering_connection_id: ActiveValue::NotSet,
185 answering_connection_server_id: ActiveValue::NotSet,
186 location_kind: ActiveValue::NotSet,
187 location_project_id: ActiveValue::NotSet,
188 }
189 .insert(&*tx)
190 .await?;
191
192 let room = self.get_room(room_id, &tx).await?;
193 let incoming_call = Self::build_incoming_call(&room, called_user_id)
194 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
195 Ok((room, incoming_call))
196 })
197 .await
198 }
199
200 pub async fn call_failed(
201 &self,
202 room_id: RoomId,
203 called_user_id: UserId,
204 ) -> Result<TransactionGuard<proto::Room>> {
205 self.room_transaction(room_id, |tx| async move {
206 room_participant::Entity::delete_many()
207 .filter(
208 room_participant::Column::RoomId
209 .eq(room_id)
210 .and(room_participant::Column::UserId.eq(called_user_id)),
211 )
212 .exec(&*tx)
213 .await?;
214 let room = self.get_room(room_id, &tx).await?;
215 Ok(room)
216 })
217 .await
218 }
219
220 pub async fn decline_call(
221 &self,
222 expected_room_id: Option<RoomId>,
223 user_id: UserId,
224 ) -> Result<Option<TransactionGuard<proto::Room>>> {
225 self.optional_room_transaction(|tx| async move {
226 let mut filter = Condition::all()
227 .add(room_participant::Column::UserId.eq(user_id))
228 .add(room_participant::Column::AnsweringConnectionId.is_null());
229 if let Some(room_id) = expected_room_id {
230 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
231 }
232 let participant = room_participant::Entity::find()
233 .filter(filter)
234 .one(&*tx)
235 .await?;
236
237 let participant = if let Some(participant) = participant {
238 participant
239 } else if expected_room_id.is_some() {
240 return Err(anyhow!("could not find call to decline"))?;
241 } else {
242 return Ok(None);
243 };
244
245 let room_id = participant.room_id;
246 room_participant::Entity::delete(participant.into_active_model())
247 .exec(&*tx)
248 .await?;
249
250 let room = self.get_room(room_id, &tx).await?;
251 Ok(Some((room_id, room)))
252 })
253 .await
254 }
255
256 pub async fn cancel_call(
257 &self,
258 room_id: RoomId,
259 calling_connection: ConnectionId,
260 called_user_id: UserId,
261 ) -> Result<TransactionGuard<proto::Room>> {
262 self.room_transaction(room_id, |tx| async move {
263 let participant = room_participant::Entity::find()
264 .filter(
265 Condition::all()
266 .add(room_participant::Column::UserId.eq(called_user_id))
267 .add(room_participant::Column::RoomId.eq(room_id))
268 .add(
269 room_participant::Column::CallingConnectionId
270 .eq(calling_connection.id as i32),
271 )
272 .add(
273 room_participant::Column::CallingConnectionServerId
274 .eq(calling_connection.owner_id as i32),
275 )
276 .add(room_participant::Column::AnsweringConnectionId.is_null()),
277 )
278 .one(&*tx)
279 .await?
280 .ok_or_else(|| anyhow!("no call to cancel"))?;
281
282 room_participant::Entity::delete(participant.into_active_model())
283 .exec(&*tx)
284 .await?;
285
286 let room = self.get_room(room_id, &tx).await?;
287 Ok(room)
288 })
289 .await
290 }
291
292 pub async fn join_room(
293 &self,
294 room_id: RoomId,
295 user_id: UserId,
296 connection: ConnectionId,
297 ) -> Result<TransactionGuard<JoinRoom>> {
298 self.room_transaction(room_id, |tx| async move {
299 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
300 enum QueryChannelId {
301 ChannelId,
302 }
303
304 let channel_id: Option<ChannelId> = room::Entity::find()
305 .select_only()
306 .column(room::Column::ChannelId)
307 .filter(room::Column::Id.eq(room_id))
308 .into_values::<_, QueryChannelId>()
309 .one(&*tx)
310 .await?
311 .ok_or_else(|| anyhow!("no such room"))?;
312
313 if channel_id.is_some() {
314 Err(anyhow!("tried to join channel call directly"))?
315 }
316
317 let participant_index = self
318 .get_next_participant_index_internal(room_id, &tx)
319 .await?;
320
321 let result = room_participant::Entity::update_many()
322 .filter(
323 Condition::all()
324 .add(room_participant::Column::RoomId.eq(room_id))
325 .add(room_participant::Column::UserId.eq(user_id))
326 .add(room_participant::Column::AnsweringConnectionId.is_null()),
327 )
328 .set(room_participant::ActiveModel {
329 participant_index: ActiveValue::Set(Some(participant_index)),
330 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
331 answering_connection_server_id: ActiveValue::set(Some(ServerId(
332 connection.owner_id as i32,
333 ))),
334 answering_connection_lost: ActiveValue::set(false),
335 ..Default::default()
336 })
337 .exec(&*tx)
338 .await?;
339 if result.rows_affected == 0 {
340 Err(anyhow!("room does not exist or was already joined"))?;
341 }
342
343 let room = self.get_room(room_id, &tx).await?;
344 Ok(JoinRoom {
345 room,
346 channel: None,
347 })
348 })
349 .await
350 }
351
352 pub async fn stale_room_connection(&self, user_id: UserId) -> Result<Option<ConnectionId>> {
353 self.transaction(|tx| async move {
354 let participant = room_participant::Entity::find()
355 .filter(room_participant::Column::UserId.eq(user_id))
356 .one(&*tx)
357 .await?;
358 Ok(participant.and_then(|p| p.answering_connection()))
359 })
360 .await
361 }
362
363 async fn get_next_participant_index_internal(
364 &self,
365 room_id: RoomId,
366 tx: &DatabaseTransaction,
367 ) -> Result<i32> {
368 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
369 enum QueryParticipantIndices {
370 ParticipantIndex,
371 }
372 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
373 .filter(
374 room_participant::Column::RoomId
375 .eq(room_id)
376 .and(room_participant::Column::ParticipantIndex.is_not_null()),
377 )
378 .select_only()
379 .column(room_participant::Column::ParticipantIndex)
380 .into_values::<_, QueryParticipantIndices>()
381 .all(tx)
382 .await?;
383
384 let mut participant_index = 0;
385 while existing_participant_indices.contains(&participant_index) {
386 participant_index += 1;
387 }
388
389 Ok(participant_index)
390 }
391
392 /// Returns the channel ID for the given room, if it has one.
393 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
394 self.transaction(|tx| async move {
395 let room: Option<room::Model> = room::Entity::find()
396 .filter(room::Column::Id.eq(room_id))
397 .one(&*tx)
398 .await?;
399
400 Ok(room.and_then(|room| room.channel_id))
401 })
402 .await
403 }
404
405 pub(crate) async fn join_channel_room_internal(
406 &self,
407 room_id: RoomId,
408 user_id: UserId,
409 connection: ConnectionId,
410 role: ChannelRole,
411 tx: &DatabaseTransaction,
412 ) -> Result<JoinRoom> {
413 let participant_index = self
414 .get_next_participant_index_internal(room_id, tx)
415 .await?;
416
417 // If someone has been invited into the room, accept the invite instead of inserting
418 let result = room_participant::Entity::update_many()
419 .filter(
420 Condition::all()
421 .add(room_participant::Column::RoomId.eq(room_id))
422 .add(room_participant::Column::UserId.eq(user_id))
423 .add(room_participant::Column::AnsweringConnectionId.is_null()),
424 )
425 .set(room_participant::ActiveModel {
426 participant_index: ActiveValue::Set(Some(participant_index)),
427 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
428 answering_connection_server_id: ActiveValue::set(Some(ServerId(
429 connection.owner_id as i32,
430 ))),
431 answering_connection_lost: ActiveValue::set(false),
432 ..Default::default()
433 })
434 .exec(tx)
435 .await?;
436
437 if result.rows_affected == 0 {
438 room_participant::Entity::insert(room_participant::ActiveModel {
439 room_id: ActiveValue::set(room_id),
440 user_id: ActiveValue::set(user_id),
441 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
442 answering_connection_server_id: ActiveValue::set(Some(ServerId(
443 connection.owner_id as i32,
444 ))),
445 answering_connection_lost: ActiveValue::set(false),
446 calling_user_id: ActiveValue::set(user_id),
447 calling_connection_id: ActiveValue::set(connection.id as i32),
448 calling_connection_server_id: ActiveValue::set(Some(ServerId(
449 connection.owner_id as i32,
450 ))),
451 participant_index: ActiveValue::Set(Some(participant_index)),
452 role: ActiveValue::set(Some(role)),
453 id: ActiveValue::NotSet,
454 location_kind: ActiveValue::NotSet,
455 location_project_id: ActiveValue::NotSet,
456 initial_project_id: ActiveValue::NotSet,
457 })
458 .exec(tx)
459 .await?;
460 }
461
462 let (channel, room) = self.get_channel_room(room_id, tx).await?;
463 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
464 Ok(JoinRoom {
465 room,
466 channel: Some(channel),
467 })
468 }
469
470 pub async fn rejoin_room(
471 &self,
472 rejoin_room: proto::RejoinRoom,
473 user_id: UserId,
474 connection: ConnectionId,
475 ) -> Result<TransactionGuard<RejoinedRoom>> {
476 let room_id = RoomId::from_proto(rejoin_room.id);
477 self.room_transaction(room_id, |tx| async {
478 let tx = tx;
479 let participant_update = room_participant::Entity::update_many()
480 .filter(
481 Condition::all()
482 .add(room_participant::Column::RoomId.eq(room_id))
483 .add(room_participant::Column::UserId.eq(user_id))
484 .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
485 )
486 .set(room_participant::ActiveModel {
487 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
488 answering_connection_server_id: ActiveValue::set(Some(ServerId(
489 connection.owner_id as i32,
490 ))),
491 answering_connection_lost: ActiveValue::set(false),
492 ..Default::default()
493 })
494 .exec(&*tx)
495 .await?;
496 if participant_update.rows_affected == 0 {
497 return Err(anyhow!("room does not exist or was already joined"))?;
498 }
499
500 let mut reshared_projects = Vec::new();
501 for reshared_project in &rejoin_room.reshared_projects {
502 let project_id = ProjectId::from_proto(reshared_project.project_id);
503 let project = project::Entity::find_by_id(project_id)
504 .one(&*tx)
505 .await?
506 .ok_or_else(|| anyhow!("project does not exist"))?;
507 if project.host_user_id != Some(user_id) {
508 return Err(anyhow!("no such project"))?;
509 }
510
511 let mut collaborators = project
512 .find_related(project_collaborator::Entity)
513 .all(&*tx)
514 .await?;
515 let host_ix = collaborators
516 .iter()
517 .position(|collaborator| {
518 collaborator.user_id == user_id && collaborator.is_host
519 })
520 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
521 let host = collaborators.swap_remove(host_ix);
522 let old_connection_id = host.connection();
523
524 project::Entity::update(project::ActiveModel {
525 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
526 host_connection_server_id: ActiveValue::set(Some(ServerId(
527 connection.owner_id as i32,
528 ))),
529 ..project.into_active_model()
530 })
531 .exec(&*tx)
532 .await?;
533 project_collaborator::Entity::update(project_collaborator::ActiveModel {
534 connection_id: ActiveValue::set(connection.id as i32),
535 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
536 ..host.into_active_model()
537 })
538 .exec(&*tx)
539 .await?;
540
541 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
542 .await?;
543
544 reshared_projects.push(ResharedProject {
545 id: project_id,
546 old_connection_id,
547 collaborators: collaborators
548 .iter()
549 .map(|collaborator| ProjectCollaborator {
550 connection_id: collaborator.connection(),
551 user_id: collaborator.user_id,
552 replica_id: collaborator.replica_id,
553 is_host: collaborator.is_host,
554 })
555 .collect(),
556 worktrees: reshared_project.worktrees.clone(),
557 });
558 }
559
560 project::Entity::delete_many()
561 .filter(
562 Condition::all()
563 .add(project::Column::RoomId.eq(room_id))
564 .add(project::Column::HostUserId.eq(user_id))
565 .add(
566 project::Column::Id
567 .is_not_in(reshared_projects.iter().map(|project| project.id)),
568 ),
569 )
570 .exec(&*tx)
571 .await?;
572
573 let mut rejoined_projects = Vec::new();
574 for rejoined_project in &rejoin_room.rejoined_projects {
575 if let Some(rejoined_project) = self
576 .rejoin_project_internal(&tx, rejoined_project, user_id, connection)
577 .await?
578 {
579 rejoined_projects.push(rejoined_project);
580 }
581 }
582
583 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
584
585 Ok(RejoinedRoom {
586 room,
587 channel,
588 rejoined_projects,
589 reshared_projects,
590 })
591 })
592 .await
593 }
594
595 pub async fn rejoin_project_internal(
596 &self,
597 tx: &DatabaseTransaction,
598 rejoined_project: &proto::RejoinProject,
599 user_id: UserId,
600 connection: ConnectionId,
601 ) -> Result<Option<RejoinedProject>> {
602 let project_id = ProjectId::from_proto(rejoined_project.id);
603 let Some(project) = project::Entity::find_by_id(project_id).one(tx).await? else {
604 return Ok(None);
605 };
606
607 let mut worktrees = Vec::new();
608 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
609 for db_worktree in db_worktrees {
610 let mut worktree = RejoinedWorktree {
611 id: db_worktree.id as u64,
612 abs_path: db_worktree.abs_path,
613 root_name: db_worktree.root_name,
614 visible: db_worktree.visible,
615 updated_entries: Default::default(),
616 removed_entries: Default::default(),
617 updated_repositories: Default::default(),
618 removed_repositories: Default::default(),
619 diagnostic_summaries: Default::default(),
620 settings_files: Default::default(),
621 scan_id: db_worktree.scan_id as u64,
622 completed_scan_id: db_worktree.completed_scan_id as u64,
623 };
624
625 let rejoined_worktree = rejoined_project
626 .worktrees
627 .iter()
628 .find(|worktree| worktree.id == db_worktree.id as u64);
629
630 // File entries
631 {
632 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
633 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
634 } else {
635 worktree_entry::Column::IsDeleted.eq(false)
636 };
637
638 let mut db_entries = worktree_entry::Entity::find()
639 .filter(
640 Condition::all()
641 .add(worktree_entry::Column::ProjectId.eq(project.id))
642 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
643 .add(entry_filter),
644 )
645 .stream(tx)
646 .await?;
647
648 while let Some(db_entry) = db_entries.next().await {
649 let db_entry = db_entry?;
650 if db_entry.is_deleted {
651 worktree.removed_entries.push(db_entry.id as u64);
652 } else {
653 worktree.updated_entries.push(proto::Entry {
654 id: db_entry.id as u64,
655 is_dir: db_entry.is_dir,
656 path: db_entry.path,
657 inode: db_entry.inode as u64,
658 mtime: Some(proto::Timestamp {
659 seconds: db_entry.mtime_seconds as u64,
660 nanos: db_entry.mtime_nanos as u32,
661 }),
662 is_symlink: db_entry.is_symlink,
663 is_ignored: db_entry.is_ignored,
664 is_external: db_entry.is_external,
665 git_status: db_entry.git_status.map(|status| status as i32),
666 // This is only used in the summarization backlog, so if it's None,
667 // that just means we won't be able to detect when to resummarize
668 // based on total number of backlogged bytes - instead, we'd go
669 // on number of files only. That shouldn't be a huge deal in practice.
670 size: None,
671 is_fifo: db_entry.is_fifo,
672 });
673 }
674 }
675 }
676
677 // Repository Entries
678 {
679 let repository_entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
680 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
681 } else {
682 worktree_repository::Column::IsDeleted.eq(false)
683 };
684
685 let mut db_repositories = worktree_repository::Entity::find()
686 .filter(
687 Condition::all()
688 .add(worktree_repository::Column::ProjectId.eq(project.id))
689 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
690 .add(repository_entry_filter),
691 )
692 .stream(tx)
693 .await?;
694
695 while let Some(db_repository) = db_repositories.next().await {
696 let db_repository = db_repository?;
697 if db_repository.is_deleted {
698 worktree
699 .removed_repositories
700 .push(db_repository.work_directory_id as u64);
701 } else {
702 worktree.updated_repositories.push(proto::RepositoryEntry {
703 work_directory_id: db_repository.work_directory_id as u64,
704 branch: db_repository.branch,
705 });
706 }
707 }
708 }
709
710 worktrees.push(worktree);
711 }
712
713 let language_servers = project
714 .find_related(language_server::Entity)
715 .all(tx)
716 .await?
717 .into_iter()
718 .map(|language_server| proto::LanguageServer {
719 id: language_server.id as u64,
720 name: language_server.name,
721 worktree_id: None,
722 })
723 .collect::<Vec<_>>();
724
725 {
726 let mut db_settings_files = worktree_settings_file::Entity::find()
727 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
728 .stream(tx)
729 .await?;
730 while let Some(db_settings_file) = db_settings_files.next().await {
731 let db_settings_file = db_settings_file?;
732 if let Some(worktree) = worktrees
733 .iter_mut()
734 .find(|w| w.id == db_settings_file.worktree_id as u64)
735 {
736 worktree.settings_files.push(WorktreeSettingsFile {
737 path: db_settings_file.path,
738 content: db_settings_file.content,
739 kind: db_settings_file.kind,
740 });
741 }
742 }
743 }
744
745 let mut collaborators = project
746 .find_related(project_collaborator::Entity)
747 .all(tx)
748 .await?;
749 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
750 .iter()
751 .position(|collaborator| collaborator.user_id == user_id)
752 {
753 collaborators.swap_remove(self_collaborator_ix)
754 } else {
755 return Ok(None);
756 };
757 let old_connection_id = self_collaborator.connection();
758 project_collaborator::Entity::update(project_collaborator::ActiveModel {
759 connection_id: ActiveValue::set(connection.id as i32),
760 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
761 ..self_collaborator.into_active_model()
762 })
763 .exec(tx)
764 .await?;
765
766 let collaborators = collaborators
767 .into_iter()
768 .map(|collaborator| ProjectCollaborator {
769 connection_id: collaborator.connection(),
770 user_id: collaborator.user_id,
771 replica_id: collaborator.replica_id,
772 is_host: collaborator.is_host,
773 })
774 .collect::<Vec<_>>();
775
776 Ok(Some(RejoinedProject {
777 id: project_id,
778 old_connection_id,
779 collaborators,
780 worktrees,
781 language_servers,
782 }))
783 }
784
785 pub async fn leave_room(
786 &self,
787 connection: ConnectionId,
788 ) -> Result<Option<TransactionGuard<LeftRoom>>> {
789 self.optional_room_transaction(|tx| async move {
790 let leaving_participant = room_participant::Entity::find()
791 .filter(
792 Condition::all()
793 .add(
794 room_participant::Column::AnsweringConnectionId
795 .eq(connection.id as i32),
796 )
797 .add(
798 room_participant::Column::AnsweringConnectionServerId
799 .eq(connection.owner_id as i32),
800 ),
801 )
802 .one(&*tx)
803 .await?;
804
805 if let Some(leaving_participant) = leaving_participant {
806 // Leave room.
807 let room_id = leaving_participant.room_id;
808 room_participant::Entity::delete_by_id(leaving_participant.id)
809 .exec(&*tx)
810 .await?;
811
812 // Cancel pending calls initiated by the leaving user.
813 let called_participants = room_participant::Entity::find()
814 .filter(
815 Condition::all()
816 .add(
817 room_participant::Column::CallingUserId
818 .eq(leaving_participant.user_id),
819 )
820 .add(room_participant::Column::AnsweringConnectionId.is_null()),
821 )
822 .all(&*tx)
823 .await?;
824 room_participant::Entity::delete_many()
825 .filter(
826 room_participant::Column::Id
827 .is_in(called_participants.iter().map(|participant| participant.id)),
828 )
829 .exec(&*tx)
830 .await?;
831 let canceled_calls_to_user_ids = called_participants
832 .into_iter()
833 .map(|participant| participant.user_id)
834 .collect();
835
836 // Detect left projects.
837 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
838 enum QueryProjectIds {
839 ProjectId,
840 }
841 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
842 .select_only()
843 .column_as(
844 project_collaborator::Column::ProjectId,
845 QueryProjectIds::ProjectId,
846 )
847 .filter(
848 Condition::all()
849 .add(
850 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
851 )
852 .add(
853 project_collaborator::Column::ConnectionServerId
854 .eq(connection.owner_id as i32),
855 ),
856 )
857 .into_values::<_, QueryProjectIds>()
858 .all(&*tx)
859 .await?;
860
861 // if any project in the room has a remote-project-id that belongs to a dev server that this user owns.
862 let dev_server_projects_for_user = self
863 .dev_server_project_ids_for_user(leaving_participant.user_id, &tx)
864 .await?;
865
866 let dev_server_projects_to_unshare = project::Entity::find()
867 .filter(
868 Condition::all()
869 .add(project::Column::RoomId.eq(room_id))
870 .add(
871 project::Column::DevServerProjectId
872 .is_in(dev_server_projects_for_user.clone()),
873 ),
874 )
875 .all(&*tx)
876 .await?
877 .into_iter()
878 .map(|project| project.id)
879 .collect::<HashSet<_>>();
880 let mut left_projects = HashMap::default();
881 let mut collaborators = project_collaborator::Entity::find()
882 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
883 .stream(&*tx)
884 .await?;
885
886 while let Some(collaborator) = collaborators.next().await {
887 let collaborator = collaborator?;
888 let left_project =
889 left_projects
890 .entry(collaborator.project_id)
891 .or_insert(LeftProject {
892 id: collaborator.project_id,
893 connection_ids: Default::default(),
894 should_unshare: false,
895 });
896
897 let collaborator_connection_id = collaborator.connection();
898 if collaborator_connection_id != connection {
899 left_project.connection_ids.push(collaborator_connection_id);
900 }
901
902 if (collaborator.is_host && collaborator.connection() == connection)
903 || dev_server_projects_to_unshare.contains(&collaborator.project_id)
904 {
905 left_project.should_unshare = true;
906 }
907 }
908 drop(collaborators);
909
910 // Leave projects.
911 project_collaborator::Entity::delete_many()
912 .filter(
913 Condition::all()
914 .add(
915 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
916 )
917 .add(
918 project_collaborator::Column::ConnectionServerId
919 .eq(connection.owner_id as i32),
920 ),
921 )
922 .exec(&*tx)
923 .await?;
924
925 follower::Entity::delete_many()
926 .filter(
927 Condition::all()
928 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
929 )
930 .exec(&*tx)
931 .await?;
932
933 // Unshare projects.
934 project::Entity::delete_many()
935 .filter(
936 Condition::all()
937 .add(project::Column::RoomId.eq(room_id))
938 .add(project::Column::HostConnectionId.eq(connection.id as i32))
939 .add(
940 project::Column::HostConnectionServerId
941 .eq(connection.owner_id as i32),
942 ),
943 )
944 .exec(&*tx)
945 .await?;
946
947 if !dev_server_projects_to_unshare.is_empty() {
948 project::Entity::update_many()
949 .filter(project::Column::Id.is_in(dev_server_projects_to_unshare))
950 .set(project::ActiveModel {
951 room_id: ActiveValue::Set(None),
952 ..Default::default()
953 })
954 .exec(&*tx)
955 .await?;
956 }
957
958 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
959 let deleted = if room.participants.is_empty() {
960 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
961 result.rows_affected > 0
962 } else {
963 false
964 };
965
966 let left_room = LeftRoom {
967 room,
968 channel,
969 left_projects,
970 canceled_calls_to_user_ids,
971 deleted,
972 };
973
974 if left_room.room.participants.is_empty() {
975 self.rooms.remove(&room_id);
976 }
977
978 Ok(Some((room_id, left_room)))
979 } else {
980 Ok(None)
981 }
982 })
983 .await
984 }
985
986 /// Updates the location of a participant in the given room.
987 pub async fn update_room_participant_location(
988 &self,
989 room_id: RoomId,
990 connection: ConnectionId,
991 location: proto::ParticipantLocation,
992 ) -> Result<TransactionGuard<proto::Room>> {
993 self.room_transaction(room_id, |tx| async {
994 let tx = tx;
995 let location_kind;
996 let location_project_id;
997 match location
998 .variant
999 .as_ref()
1000 .ok_or_else(|| anyhow!("invalid location"))?
1001 {
1002 proto::participant_location::Variant::SharedProject(project) => {
1003 location_kind = 0;
1004 location_project_id = Some(ProjectId::from_proto(project.id));
1005 }
1006 proto::participant_location::Variant::UnsharedProject(_) => {
1007 location_kind = 1;
1008 location_project_id = None;
1009 }
1010 proto::participant_location::Variant::External(_) => {
1011 location_kind = 2;
1012 location_project_id = None;
1013 }
1014 }
1015
1016 let result = room_participant::Entity::update_many()
1017 .filter(
1018 Condition::all()
1019 .add(room_participant::Column::RoomId.eq(room_id))
1020 .add(
1021 room_participant::Column::AnsweringConnectionId
1022 .eq(connection.id as i32),
1023 )
1024 .add(
1025 room_participant::Column::AnsweringConnectionServerId
1026 .eq(connection.owner_id as i32),
1027 ),
1028 )
1029 .set(room_participant::ActiveModel {
1030 location_kind: ActiveValue::set(Some(location_kind)),
1031 location_project_id: ActiveValue::set(location_project_id),
1032 ..Default::default()
1033 })
1034 .exec(&*tx)
1035 .await?;
1036
1037 if result.rows_affected == 1 {
1038 let room = self.get_room(room_id, &tx).await?;
1039 Ok(room)
1040 } else {
1041 Err(anyhow!("could not update room participant location"))?
1042 }
1043 })
1044 .await
1045 }
1046
1047 /// Sets the role of a participant in the given room.
1048 pub async fn set_room_participant_role(
1049 &self,
1050 admin_id: UserId,
1051 room_id: RoomId,
1052 user_id: UserId,
1053 role: ChannelRole,
1054 ) -> Result<TransactionGuard<proto::Room>> {
1055 self.room_transaction(room_id, |tx| async move {
1056 room_participant::Entity::find()
1057 .filter(
1058 Condition::all()
1059 .add(room_participant::Column::RoomId.eq(room_id))
1060 .add(room_participant::Column::UserId.eq(admin_id))
1061 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1062 )
1063 .one(&*tx)
1064 .await?
1065 .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1066
1067 if role.requires_cla() {
1068 self.check_user_has_signed_cla(user_id, room_id, &tx)
1069 .await?;
1070 }
1071
1072 let result = room_participant::Entity::update_many()
1073 .filter(
1074 Condition::all()
1075 .add(room_participant::Column::RoomId.eq(room_id))
1076 .add(room_participant::Column::UserId.eq(user_id)),
1077 )
1078 .set(room_participant::ActiveModel {
1079 role: ActiveValue::set(Some(role)),
1080 ..Default::default()
1081 })
1082 .exec(&*tx)
1083 .await?;
1084
1085 if result.rows_affected != 1 {
1086 Err(anyhow!("could not update room participant role"))?;
1087 }
1088 self.get_room(room_id, &tx).await
1089 })
1090 .await
1091 }
1092
1093 async fn check_user_has_signed_cla(
1094 &self,
1095 user_id: UserId,
1096 room_id: RoomId,
1097 tx: &DatabaseTransaction,
1098 ) -> Result<()> {
1099 let channel = room::Entity::find_by_id(room_id)
1100 .one(tx)
1101 .await?
1102 .ok_or_else(|| anyhow!("could not find room"))?
1103 .find_related(channel::Entity)
1104 .one(tx)
1105 .await?;
1106
1107 if let Some(channel) = channel {
1108 let requires_zed_cla = channel.requires_zed_cla
1109 || channel::Entity::find()
1110 .filter(
1111 channel::Column::Id
1112 .is_in(channel.ancestors())
1113 .and(channel::Column::RequiresZedCla.eq(true)),
1114 )
1115 .count(tx)
1116 .await?
1117 > 0;
1118 if requires_zed_cla
1119 && contributor::Entity::find()
1120 .filter(contributor::Column::UserId.eq(user_id))
1121 .one(tx)
1122 .await?
1123 .is_none()
1124 {
1125 Err(anyhow!("user has not signed the Zed CLA"))?;
1126 }
1127 }
1128 Ok(())
1129 }
1130
1131 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1132 self.transaction(|tx| async move {
1133 self.room_connection_lost(connection, &tx).await?;
1134 self.channel_buffer_connection_lost(connection, &tx).await?;
1135 self.channel_chat_connection_lost(connection, &tx).await?;
1136 Ok(())
1137 })
1138 .await
1139 }
1140
1141 pub async fn room_connection_lost(
1142 &self,
1143 connection: ConnectionId,
1144 tx: &DatabaseTransaction,
1145 ) -> Result<()> {
1146 let participant = room_participant::Entity::find()
1147 .filter(
1148 Condition::all()
1149 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1150 .add(
1151 room_participant::Column::AnsweringConnectionServerId
1152 .eq(connection.owner_id as i32),
1153 ),
1154 )
1155 .one(tx)
1156 .await?;
1157
1158 if let Some(participant) = participant {
1159 room_participant::Entity::update(room_participant::ActiveModel {
1160 answering_connection_lost: ActiveValue::set(true),
1161 ..participant.into_active_model()
1162 })
1163 .exec(tx)
1164 .await?;
1165 }
1166 Ok(())
1167 }
1168
1169 fn build_incoming_call(
1170 room: &proto::Room,
1171 called_user_id: UserId,
1172 ) -> Option<proto::IncomingCall> {
1173 let pending_participant = room
1174 .pending_participants
1175 .iter()
1176 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1177
1178 Some(proto::IncomingCall {
1179 room_id: room.id,
1180 calling_user_id: pending_participant.calling_user_id,
1181 participant_user_ids: room
1182 .participants
1183 .iter()
1184 .map(|participant| participant.user_id)
1185 .collect(),
1186 initial_project: room.participants.iter().find_map(|participant| {
1187 let initial_project_id = pending_participant.initial_project_id?;
1188 participant
1189 .projects
1190 .iter()
1191 .find(|project| project.id == initial_project_id)
1192 .cloned()
1193 }),
1194 })
1195 }
1196
1197 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1198 let (_, room) = self.get_channel_room(room_id, tx).await?;
1199 Ok(room)
1200 }
1201
1202 pub async fn room_connection_ids(
1203 &self,
1204 room_id: RoomId,
1205 connection_id: ConnectionId,
1206 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1207 self.room_transaction(room_id, |tx| async move {
1208 let mut participants = room_participant::Entity::find()
1209 .filter(room_participant::Column::RoomId.eq(room_id))
1210 .stream(&*tx)
1211 .await?;
1212
1213 let mut is_participant = false;
1214 let mut connection_ids = HashSet::default();
1215 while let Some(participant) = participants.next().await {
1216 let participant = participant?;
1217 if let Some(answering_connection) = participant.answering_connection() {
1218 if answering_connection == connection_id {
1219 is_participant = true;
1220 } else {
1221 connection_ids.insert(answering_connection);
1222 }
1223 }
1224 }
1225
1226 if !is_participant {
1227 Err(anyhow!("not a room participant"))?;
1228 }
1229
1230 Ok(connection_ids)
1231 })
1232 .await
1233 }
1234
1235 async fn get_channel_room(
1236 &self,
1237 room_id: RoomId,
1238 tx: &DatabaseTransaction,
1239 ) -> Result<(Option<channel::Model>, proto::Room)> {
1240 let db_room = room::Entity::find_by_id(room_id)
1241 .one(tx)
1242 .await?
1243 .ok_or_else(|| anyhow!("could not find room"))?;
1244
1245 let mut db_participants = db_room
1246 .find_related(room_participant::Entity)
1247 .stream(tx)
1248 .await?;
1249 let mut participants = HashMap::default();
1250 let mut pending_participants = Vec::new();
1251 while let Some(db_participant) = db_participants.next().await {
1252 let db_participant = db_participant?;
1253 if let (
1254 Some(answering_connection_id),
1255 Some(answering_connection_server_id),
1256 Some(participant_index),
1257 ) = (
1258 db_participant.answering_connection_id,
1259 db_participant.answering_connection_server_id,
1260 db_participant.participant_index,
1261 ) {
1262 let location = match (
1263 db_participant.location_kind,
1264 db_participant.location_project_id,
1265 ) {
1266 (Some(0), Some(project_id)) => {
1267 Some(proto::participant_location::Variant::SharedProject(
1268 proto::participant_location::SharedProject {
1269 id: project_id.to_proto(),
1270 },
1271 ))
1272 }
1273 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1274 Default::default(),
1275 )),
1276 _ => Some(proto::participant_location::Variant::External(
1277 Default::default(),
1278 )),
1279 };
1280
1281 let answering_connection = ConnectionId {
1282 owner_id: answering_connection_server_id.0 as u32,
1283 id: answering_connection_id as u32,
1284 };
1285 participants.insert(
1286 answering_connection,
1287 proto::Participant {
1288 user_id: db_participant.user_id.to_proto(),
1289 peer_id: Some(answering_connection.into()),
1290 projects: Default::default(),
1291 location: Some(proto::ParticipantLocation { variant: location }),
1292 participant_index: participant_index as u32,
1293 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1294 },
1295 );
1296 } else {
1297 pending_participants.push(proto::PendingParticipant {
1298 user_id: db_participant.user_id.to_proto(),
1299 calling_user_id: db_participant.calling_user_id.to_proto(),
1300 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1301 });
1302 }
1303 }
1304 drop(db_participants);
1305
1306 let db_projects = db_room
1307 .find_related(project::Entity)
1308 .find_with_related(worktree::Entity)
1309 .all(tx)
1310 .await?;
1311
1312 for (db_project, db_worktrees) in db_projects {
1313 let host_connection = db_project.host_connection()?;
1314 if let Some(participant) = participants.get_mut(&host_connection) {
1315 participant.projects.push(proto::ParticipantProject {
1316 id: db_project.id.to_proto(),
1317 worktree_root_names: Default::default(),
1318 });
1319 let project = participant.projects.last_mut().unwrap();
1320
1321 for db_worktree in db_worktrees {
1322 if db_worktree.visible {
1323 project.worktree_root_names.push(db_worktree.root_name);
1324 }
1325 }
1326 } else if let Some(dev_server_project_id) = db_project.dev_server_project_id {
1327 let host = self
1328 .owner_for_dev_server_project(dev_server_project_id, tx)
1329 .await?;
1330 if let Some((_, participant)) = participants
1331 .iter_mut()
1332 .find(|(_, v)| v.user_id == host.to_proto())
1333 {
1334 participant.projects.push(proto::ParticipantProject {
1335 id: db_project.id.to_proto(),
1336 worktree_root_names: Default::default(),
1337 });
1338 let project = participant.projects.last_mut().unwrap();
1339
1340 for db_worktree in db_worktrees {
1341 if db_worktree.visible {
1342 project.worktree_root_names.push(db_worktree.root_name);
1343 }
1344 }
1345 }
1346 }
1347 }
1348
1349 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1350 let mut followers = Vec::new();
1351 while let Some(db_follower) = db_followers.next().await {
1352 let db_follower = db_follower?;
1353 followers.push(proto::Follower {
1354 leader_id: Some(db_follower.leader_connection().into()),
1355 follower_id: Some(db_follower.follower_connection().into()),
1356 project_id: db_follower.project_id.to_proto(),
1357 });
1358 }
1359 drop(db_followers);
1360
1361 let channel = if let Some(channel_id) = db_room.channel_id {
1362 Some(self.get_channel_internal(channel_id, tx).await?)
1363 } else {
1364 None
1365 };
1366
1367 Ok((
1368 channel,
1369 proto::Room {
1370 id: db_room.id.to_proto(),
1371 live_kit_room: db_room.live_kit_room,
1372 participants: participants.into_values().collect(),
1373 pending_participants,
1374 followers,
1375 },
1376 ))
1377 }
1378}