1use super::*;
2
3impl Database {
4 /// Clears all room participants in rooms attached to a stale server.
5 pub async fn clear_stale_room_participants(
6 &self,
7 room_id: RoomId,
8 new_server_id: ServerId,
9 ) -> Result<RoomGuard<RefreshedRoom>> {
10 self.room_transaction(room_id, |tx| async move {
11 let stale_participant_filter = Condition::all()
12 .add(room_participant::Column::RoomId.eq(room_id))
13 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
14 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
15
16 let stale_participant_user_ids = room_participant::Entity::find()
17 .filter(stale_participant_filter.clone())
18 .all(&*tx)
19 .await?
20 .into_iter()
21 .map(|participant| participant.user_id)
22 .collect::<Vec<_>>();
23
24 // Delete participants who failed to reconnect and cancel their calls.
25 let mut canceled_calls_to_user_ids = Vec::new();
26 room_participant::Entity::delete_many()
27 .filter(stale_participant_filter)
28 .exec(&*tx)
29 .await?;
30 let called_participants = room_participant::Entity::find()
31 .filter(
32 Condition::all()
33 .add(
34 room_participant::Column::CallingUserId
35 .is_in(stale_participant_user_ids.iter().copied()),
36 )
37 .add(room_participant::Column::AnsweringConnectionId.is_null()),
38 )
39 .all(&*tx)
40 .await?;
41 room_participant::Entity::delete_many()
42 .filter(
43 room_participant::Column::Id
44 .is_in(called_participants.iter().map(|participant| participant.id)),
45 )
46 .exec(&*tx)
47 .await?;
48 canceled_calls_to_user_ids.extend(
49 called_participants
50 .into_iter()
51 .map(|participant| participant.user_id),
52 );
53
54 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
55 let channel_members;
56 if let Some(channel) = &channel {
57 channel_members = self.get_channel_participants(channel, &tx).await?;
58 } else {
59 channel_members = Vec::new();
60
61 // Delete the room if it becomes empty.
62 if room.participants.is_empty() {
63 project::Entity::delete_many()
64 .filter(project::Column::RoomId.eq(room_id))
65 .exec(&*tx)
66 .await?;
67 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
68 }
69 };
70
71 Ok(RefreshedRoom {
72 room,
73 channel_id: channel.map(|channel| channel.id),
74 channel_members,
75 stale_participant_user_ids,
76 canceled_calls_to_user_ids,
77 })
78 })
79 .await
80 }
81
82 /// Returns the incoming calls for user with the given ID.
83 pub async fn incoming_call_for_user(
84 &self,
85 user_id: UserId,
86 ) -> Result<Option<proto::IncomingCall>> {
87 self.transaction(|tx| async move {
88 let pending_participant = room_participant::Entity::find()
89 .filter(
90 room_participant::Column::UserId
91 .eq(user_id)
92 .and(room_participant::Column::AnsweringConnectionId.is_null()),
93 )
94 .one(&*tx)
95 .await?;
96
97 if let Some(pending_participant) = pending_participant {
98 let room = self.get_room(pending_participant.room_id, &tx).await?;
99 Ok(Self::build_incoming_call(&room, user_id))
100 } else {
101 Ok(None)
102 }
103 })
104 .await
105 }
106
107 /// Creates a new room.
108 pub async fn create_room(
109 &self,
110 user_id: UserId,
111 connection: ConnectionId,
112 live_kit_room: &str,
113 release_channel: &str,
114 ) -> Result<proto::Room> {
115 self.transaction(|tx| async move {
116 let room = room::ActiveModel {
117 live_kit_room: ActiveValue::set(live_kit_room.into()),
118 environment: ActiveValue::set(Some(release_channel.to_string())),
119 ..Default::default()
120 }
121 .insert(&*tx)
122 .await?;
123 room_participant::ActiveModel {
124 room_id: ActiveValue::set(room.id),
125 user_id: ActiveValue::set(user_id),
126 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
127 answering_connection_server_id: ActiveValue::set(Some(ServerId(
128 connection.owner_id as i32,
129 ))),
130 answering_connection_lost: ActiveValue::set(false),
131 calling_user_id: ActiveValue::set(user_id),
132 calling_connection_id: ActiveValue::set(connection.id as i32),
133 calling_connection_server_id: ActiveValue::set(Some(ServerId(
134 connection.owner_id as i32,
135 ))),
136 participant_index: ActiveValue::set(Some(0)),
137 role: ActiveValue::set(Some(ChannelRole::Admin)),
138
139 id: ActiveValue::NotSet,
140 location_kind: ActiveValue::NotSet,
141 location_project_id: ActiveValue::NotSet,
142 initial_project_id: ActiveValue::NotSet,
143 }
144 .insert(&*tx)
145 .await?;
146
147 let room = self.get_room(room.id, &tx).await?;
148 Ok(room)
149 })
150 .await
151 }
152
153 pub async fn call(
154 &self,
155 room_id: RoomId,
156 calling_user_id: UserId,
157 calling_connection: ConnectionId,
158 called_user_id: UserId,
159 initial_project_id: Option<ProjectId>,
160 ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
161 self.room_transaction(room_id, |tx| async move {
162 let caller = room_participant::Entity::find()
163 .filter(
164 room_participant::Column::UserId
165 .eq(calling_user_id)
166 .and(room_participant::Column::RoomId.eq(room_id)),
167 )
168 .one(&*tx)
169 .await?
170 .ok_or_else(|| anyhow!("user is not in the room"))?;
171
172 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
173 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
174 ChannelRole::Guest => ChannelRole::Guest,
175 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
176 };
177
178 room_participant::ActiveModel {
179 room_id: ActiveValue::set(room_id),
180 user_id: ActiveValue::set(called_user_id),
181 answering_connection_lost: ActiveValue::set(false),
182 participant_index: ActiveValue::NotSet,
183 calling_user_id: ActiveValue::set(calling_user_id),
184 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
185 calling_connection_server_id: ActiveValue::set(Some(ServerId(
186 calling_connection.owner_id as i32,
187 ))),
188 initial_project_id: ActiveValue::set(initial_project_id),
189 role: ActiveValue::set(Some(called_user_role)),
190
191 id: ActiveValue::NotSet,
192 answering_connection_id: ActiveValue::NotSet,
193 answering_connection_server_id: ActiveValue::NotSet,
194 location_kind: ActiveValue::NotSet,
195 location_project_id: ActiveValue::NotSet,
196 }
197 .insert(&*tx)
198 .await?;
199
200 let room = self.get_room(room_id, &tx).await?;
201 let incoming_call = Self::build_incoming_call(&room, called_user_id)
202 .ok_or_else(|| anyhow!("failed to build incoming call"))?;
203 Ok((room, incoming_call))
204 })
205 .await
206 }
207
208 pub async fn call_failed(
209 &self,
210 room_id: RoomId,
211 called_user_id: UserId,
212 ) -> Result<RoomGuard<proto::Room>> {
213 self.room_transaction(room_id, |tx| async move {
214 room_participant::Entity::delete_many()
215 .filter(
216 room_participant::Column::RoomId
217 .eq(room_id)
218 .and(room_participant::Column::UserId.eq(called_user_id)),
219 )
220 .exec(&*tx)
221 .await?;
222 let room = self.get_room(room_id, &tx).await?;
223 Ok(room)
224 })
225 .await
226 }
227
228 pub async fn decline_call(
229 &self,
230 expected_room_id: Option<RoomId>,
231 user_id: UserId,
232 ) -> Result<Option<RoomGuard<proto::Room>>> {
233 self.optional_room_transaction(|tx| async move {
234 let mut filter = Condition::all()
235 .add(room_participant::Column::UserId.eq(user_id))
236 .add(room_participant::Column::AnsweringConnectionId.is_null());
237 if let Some(room_id) = expected_room_id {
238 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
239 }
240 let participant = room_participant::Entity::find()
241 .filter(filter)
242 .one(&*tx)
243 .await?;
244
245 let participant = if let Some(participant) = participant {
246 participant
247 } else if expected_room_id.is_some() {
248 return Err(anyhow!("could not find call to decline"))?;
249 } else {
250 return Ok(None);
251 };
252
253 let room_id = participant.room_id;
254 room_participant::Entity::delete(participant.into_active_model())
255 .exec(&*tx)
256 .await?;
257
258 let room = self.get_room(room_id, &tx).await?;
259 Ok(Some((room_id, room)))
260 })
261 .await
262 }
263
264 pub async fn cancel_call(
265 &self,
266 room_id: RoomId,
267 calling_connection: ConnectionId,
268 called_user_id: UserId,
269 ) -> Result<RoomGuard<proto::Room>> {
270 self.room_transaction(room_id, |tx| async move {
271 let participant = room_participant::Entity::find()
272 .filter(
273 Condition::all()
274 .add(room_participant::Column::UserId.eq(called_user_id))
275 .add(room_participant::Column::RoomId.eq(room_id))
276 .add(
277 room_participant::Column::CallingConnectionId
278 .eq(calling_connection.id as i32),
279 )
280 .add(
281 room_participant::Column::CallingConnectionServerId
282 .eq(calling_connection.owner_id as i32),
283 )
284 .add(room_participant::Column::AnsweringConnectionId.is_null()),
285 )
286 .one(&*tx)
287 .await?
288 .ok_or_else(|| anyhow!("no call to cancel"))?;
289
290 room_participant::Entity::delete(participant.into_active_model())
291 .exec(&*tx)
292 .await?;
293
294 let room = self.get_room(room_id, &tx).await?;
295 Ok(room)
296 })
297 .await
298 }
299
300 pub async fn join_room(
301 &self,
302 room_id: RoomId,
303 user_id: UserId,
304 connection: ConnectionId,
305 environment: &str,
306 ) -> Result<RoomGuard<JoinRoom>> {
307 self.room_transaction(room_id, |tx| async move {
308 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
309 enum QueryChannelIdAndEnvironment {
310 ChannelId,
311 Environment,
312 }
313
314 let (channel_id, release_channel): (Option<ChannelId>, Option<String>) =
315 room::Entity::find()
316 .select_only()
317 .column(room::Column::ChannelId)
318 .column(room::Column::Environment)
319 .filter(room::Column::Id.eq(room_id))
320 .into_values::<_, QueryChannelIdAndEnvironment>()
321 .one(&*tx)
322 .await?
323 .ok_or_else(|| anyhow!("no such room"))?;
324
325 if let Some(release_channel) = release_channel {
326 if &release_channel != environment {
327 Err(anyhow!("must join using the {} release", release_channel))?;
328 }
329 }
330
331 if channel_id.is_some() {
332 Err(anyhow!("tried to join channel call directly"))?
333 }
334
335 let participant_index = self
336 .get_next_participant_index_internal(room_id, &*tx)
337 .await?;
338
339 let result = room_participant::Entity::update_many()
340 .filter(
341 Condition::all()
342 .add(room_participant::Column::RoomId.eq(room_id))
343 .add(room_participant::Column::UserId.eq(user_id))
344 .add(room_participant::Column::AnsweringConnectionId.is_null()),
345 )
346 .set(room_participant::ActiveModel {
347 participant_index: ActiveValue::Set(Some(participant_index)),
348 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
349 answering_connection_server_id: ActiveValue::set(Some(ServerId(
350 connection.owner_id as i32,
351 ))),
352 answering_connection_lost: ActiveValue::set(false),
353 ..Default::default()
354 })
355 .exec(&*tx)
356 .await?;
357 if result.rows_affected == 0 {
358 Err(anyhow!("room does not exist or was already joined"))?;
359 }
360
361 let room = self.get_room(room_id, &tx).await?;
362 Ok(JoinRoom {
363 room,
364 channel_id: None,
365 channel_members: vec![],
366 })
367 })
368 .await
369 }
370
371 async fn get_next_participant_index_internal(
372 &self,
373 room_id: RoomId,
374 tx: &DatabaseTransaction,
375 ) -> Result<i32> {
376 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
377 enum QueryParticipantIndices {
378 ParticipantIndex,
379 }
380 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
381 .filter(
382 room_participant::Column::RoomId
383 .eq(room_id)
384 .and(room_participant::Column::ParticipantIndex.is_not_null()),
385 )
386 .select_only()
387 .column(room_participant::Column::ParticipantIndex)
388 .into_values::<_, QueryParticipantIndices>()
389 .all(&*tx)
390 .await?;
391
392 let mut participant_index = 0;
393 while existing_participant_indices.contains(&participant_index) {
394 participant_index += 1;
395 }
396
397 Ok(participant_index)
398 }
399
400 /// Returns the channel ID for the given room, if it has one.
401 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
402 self.transaction(|tx| async move {
403 let room: Option<room::Model> = room::Entity::find()
404 .filter(room::Column::Id.eq(room_id))
405 .one(&*tx)
406 .await?;
407
408 Ok(room.and_then(|room| room.channel_id))
409 })
410 .await
411 }
412
413 pub(crate) async fn join_channel_room_internal(
414 &self,
415 room_id: RoomId,
416 user_id: UserId,
417 connection: ConnectionId,
418 role: ChannelRole,
419 tx: &DatabaseTransaction,
420 ) -> Result<JoinRoom> {
421 let participant_index = self
422 .get_next_participant_index_internal(room_id, &*tx)
423 .await?;
424
425 room_participant::Entity::insert_many([room_participant::ActiveModel {
426 room_id: ActiveValue::set(room_id),
427 user_id: ActiveValue::set(user_id),
428 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
429 answering_connection_server_id: ActiveValue::set(Some(ServerId(
430 connection.owner_id as i32,
431 ))),
432 answering_connection_lost: ActiveValue::set(false),
433 calling_user_id: ActiveValue::set(user_id),
434 calling_connection_id: ActiveValue::set(connection.id as i32),
435 calling_connection_server_id: ActiveValue::set(Some(ServerId(
436 connection.owner_id as i32,
437 ))),
438 participant_index: ActiveValue::Set(Some(participant_index)),
439 role: ActiveValue::set(Some(role)),
440 id: ActiveValue::NotSet,
441 location_kind: ActiveValue::NotSet,
442 location_project_id: ActiveValue::NotSet,
443 initial_project_id: ActiveValue::NotSet,
444 }])
445 .on_conflict(
446 OnConflict::columns([room_participant::Column::UserId])
447 .update_columns([
448 room_participant::Column::AnsweringConnectionId,
449 room_participant::Column::AnsweringConnectionServerId,
450 room_participant::Column::AnsweringConnectionLost,
451 room_participant::Column::ParticipantIndex,
452 room_participant::Column::Role,
453 ])
454 .to_owned(),
455 )
456 .exec(&*tx)
457 .await?;
458
459 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
460 let channel = channel.ok_or_else(|| anyhow!("no channel for room"))?;
461 let channel_members = self.get_channel_participants(&channel, &*tx).await?;
462 Ok(JoinRoom {
463 room,
464 channel_id: Some(channel.id),
465 channel_members,
466 })
467 }
468
469 pub async fn rejoin_room(
470 &self,
471 rejoin_room: proto::RejoinRoom,
472 user_id: UserId,
473 connection: ConnectionId,
474 ) -> Result<RoomGuard<RejoinedRoom>> {
475 let room_id = RoomId::from_proto(rejoin_room.id);
476 self.room_transaction(room_id, |tx| async {
477 let tx = tx;
478 let participant_update = room_participant::Entity::update_many()
479 .filter(
480 Condition::all()
481 .add(room_participant::Column::RoomId.eq(room_id))
482 .add(room_participant::Column::UserId.eq(user_id))
483 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
484 .add(
485 Condition::any()
486 .add(room_participant::Column::AnsweringConnectionLost.eq(true))
487 .add(
488 room_participant::Column::AnsweringConnectionServerId
489 .ne(connection.owner_id as i32),
490 ),
491 ),
492 )
493 .set(room_participant::ActiveModel {
494 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
495 answering_connection_server_id: ActiveValue::set(Some(ServerId(
496 connection.owner_id as i32,
497 ))),
498 answering_connection_lost: ActiveValue::set(false),
499 ..Default::default()
500 })
501 .exec(&*tx)
502 .await?;
503 if participant_update.rows_affected == 0 {
504 return Err(anyhow!("room does not exist or was already joined"))?;
505 }
506
507 let mut reshared_projects = Vec::new();
508 for reshared_project in &rejoin_room.reshared_projects {
509 let project_id = ProjectId::from_proto(reshared_project.project_id);
510 let project = project::Entity::find_by_id(project_id)
511 .one(&*tx)
512 .await?
513 .ok_or_else(|| anyhow!("project does not exist"))?;
514 if project.host_user_id != user_id {
515 return Err(anyhow!("no such project"))?;
516 }
517
518 let mut collaborators = project
519 .find_related(project_collaborator::Entity)
520 .all(&*tx)
521 .await?;
522 let host_ix = collaborators
523 .iter()
524 .position(|collaborator| {
525 collaborator.user_id == user_id && collaborator.is_host
526 })
527 .ok_or_else(|| anyhow!("host not found among collaborators"))?;
528 let host = collaborators.swap_remove(host_ix);
529 let old_connection_id = host.connection();
530
531 project::Entity::update(project::ActiveModel {
532 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
533 host_connection_server_id: ActiveValue::set(Some(ServerId(
534 connection.owner_id as i32,
535 ))),
536 ..project.into_active_model()
537 })
538 .exec(&*tx)
539 .await?;
540 project_collaborator::Entity::update(project_collaborator::ActiveModel {
541 connection_id: ActiveValue::set(connection.id as i32),
542 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
543 ..host.into_active_model()
544 })
545 .exec(&*tx)
546 .await?;
547
548 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
549 .await?;
550
551 reshared_projects.push(ResharedProject {
552 id: project_id,
553 old_connection_id,
554 collaborators: collaborators
555 .iter()
556 .map(|collaborator| ProjectCollaborator {
557 connection_id: collaborator.connection(),
558 user_id: collaborator.user_id,
559 replica_id: collaborator.replica_id,
560 is_host: collaborator.is_host,
561 })
562 .collect(),
563 worktrees: reshared_project.worktrees.clone(),
564 });
565 }
566
567 project::Entity::delete_many()
568 .filter(
569 Condition::all()
570 .add(project::Column::RoomId.eq(room_id))
571 .add(project::Column::HostUserId.eq(user_id))
572 .add(
573 project::Column::Id
574 .is_not_in(reshared_projects.iter().map(|project| project.id)),
575 ),
576 )
577 .exec(&*tx)
578 .await?;
579
580 let mut rejoined_projects = Vec::new();
581 for rejoined_project in &rejoin_room.rejoined_projects {
582 let project_id = ProjectId::from_proto(rejoined_project.id);
583 let Some(project) = project::Entity::find_by_id(project_id).one(&*tx).await? else {
584 continue;
585 };
586
587 let mut worktrees = Vec::new();
588 let db_worktrees = project.find_related(worktree::Entity).all(&*tx).await?;
589 for db_worktree in db_worktrees {
590 let mut worktree = RejoinedWorktree {
591 id: db_worktree.id as u64,
592 abs_path: db_worktree.abs_path,
593 root_name: db_worktree.root_name,
594 visible: db_worktree.visible,
595 updated_entries: Default::default(),
596 removed_entries: Default::default(),
597 updated_repositories: Default::default(),
598 removed_repositories: Default::default(),
599 diagnostic_summaries: Default::default(),
600 settings_files: Default::default(),
601 scan_id: db_worktree.scan_id as u64,
602 completed_scan_id: db_worktree.completed_scan_id as u64,
603 };
604
605 let rejoined_worktree = rejoined_project
606 .worktrees
607 .iter()
608 .find(|worktree| worktree.id == db_worktree.id as u64);
609
610 // File entries
611 {
612 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
613 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
614 } else {
615 worktree_entry::Column::IsDeleted.eq(false)
616 };
617
618 let mut db_entries = worktree_entry::Entity::find()
619 .filter(
620 Condition::all()
621 .add(worktree_entry::Column::ProjectId.eq(project.id))
622 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
623 .add(entry_filter),
624 )
625 .stream(&*tx)
626 .await?;
627
628 while let Some(db_entry) = db_entries.next().await {
629 let db_entry = db_entry?;
630 if db_entry.is_deleted {
631 worktree.removed_entries.push(db_entry.id as u64);
632 } else {
633 worktree.updated_entries.push(proto::Entry {
634 id: db_entry.id as u64,
635 is_dir: db_entry.is_dir,
636 path: db_entry.path,
637 inode: db_entry.inode as u64,
638 mtime: Some(proto::Timestamp {
639 seconds: db_entry.mtime_seconds as u64,
640 nanos: db_entry.mtime_nanos as u32,
641 }),
642 is_symlink: db_entry.is_symlink,
643 is_ignored: db_entry.is_ignored,
644 is_external: db_entry.is_external,
645 git_status: db_entry.git_status.map(|status| status as i32),
646 });
647 }
648 }
649 }
650
651 // Repository Entries
652 {
653 let repository_entry_filter =
654 if let Some(rejoined_worktree) = rejoined_worktree {
655 worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id)
656 } else {
657 worktree_repository::Column::IsDeleted.eq(false)
658 };
659
660 let mut db_repositories = worktree_repository::Entity::find()
661 .filter(
662 Condition::all()
663 .add(worktree_repository::Column::ProjectId.eq(project.id))
664 .add(worktree_repository::Column::WorktreeId.eq(worktree.id))
665 .add(repository_entry_filter),
666 )
667 .stream(&*tx)
668 .await?;
669
670 while let Some(db_repository) = db_repositories.next().await {
671 let db_repository = db_repository?;
672 if db_repository.is_deleted {
673 worktree
674 .removed_repositories
675 .push(db_repository.work_directory_id as u64);
676 } else {
677 worktree.updated_repositories.push(proto::RepositoryEntry {
678 work_directory_id: db_repository.work_directory_id as u64,
679 branch: db_repository.branch,
680 });
681 }
682 }
683 }
684
685 worktrees.push(worktree);
686 }
687
688 let language_servers = project
689 .find_related(language_server::Entity)
690 .all(&*tx)
691 .await?
692 .into_iter()
693 .map(|language_server| proto::LanguageServer {
694 id: language_server.id as u64,
695 name: language_server.name,
696 })
697 .collect::<Vec<_>>();
698
699 {
700 let mut db_settings_files = worktree_settings_file::Entity::find()
701 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
702 .stream(&*tx)
703 .await?;
704 while let Some(db_settings_file) = db_settings_files.next().await {
705 let db_settings_file = db_settings_file?;
706 if let Some(worktree) = worktrees
707 .iter_mut()
708 .find(|w| w.id == db_settings_file.worktree_id as u64)
709 {
710 worktree.settings_files.push(WorktreeSettingsFile {
711 path: db_settings_file.path,
712 content: db_settings_file.content,
713 });
714 }
715 }
716 }
717
718 let mut collaborators = project
719 .find_related(project_collaborator::Entity)
720 .all(&*tx)
721 .await?;
722 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
723 .iter()
724 .position(|collaborator| collaborator.user_id == user_id)
725 {
726 collaborators.swap_remove(self_collaborator_ix)
727 } else {
728 continue;
729 };
730 let old_connection_id = self_collaborator.connection();
731 project_collaborator::Entity::update(project_collaborator::ActiveModel {
732 connection_id: ActiveValue::set(connection.id as i32),
733 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
734 ..self_collaborator.into_active_model()
735 })
736 .exec(&*tx)
737 .await?;
738
739 let collaborators = collaborators
740 .into_iter()
741 .map(|collaborator| ProjectCollaborator {
742 connection_id: collaborator.connection(),
743 user_id: collaborator.user_id,
744 replica_id: collaborator.replica_id,
745 is_host: collaborator.is_host,
746 })
747 .collect::<Vec<_>>();
748
749 rejoined_projects.push(RejoinedProject {
750 id: project_id,
751 old_connection_id,
752 collaborators,
753 worktrees,
754 language_servers,
755 });
756 }
757
758 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
759 let channel_members = if let Some(channel) = &channel {
760 self.get_channel_participants(&channel, &tx).await?
761 } else {
762 Vec::new()
763 };
764
765 Ok(RejoinedRoom {
766 room,
767 channel_id: channel.map(|channel| channel.id),
768 channel_members,
769 rejoined_projects,
770 reshared_projects,
771 })
772 })
773 .await
774 }
775
776 pub async fn leave_room(
777 &self,
778 connection: ConnectionId,
779 ) -> Result<Option<RoomGuard<LeftRoom>>> {
780 self.optional_room_transaction(|tx| async move {
781 let leaving_participant = room_participant::Entity::find()
782 .filter(
783 Condition::all()
784 .add(
785 room_participant::Column::AnsweringConnectionId
786 .eq(connection.id as i32),
787 )
788 .add(
789 room_participant::Column::AnsweringConnectionServerId
790 .eq(connection.owner_id as i32),
791 ),
792 )
793 .one(&*tx)
794 .await?;
795
796 if let Some(leaving_participant) = leaving_participant {
797 // Leave room.
798 let room_id = leaving_participant.room_id;
799 room_participant::Entity::delete_by_id(leaving_participant.id)
800 .exec(&*tx)
801 .await?;
802
803 // Cancel pending calls initiated by the leaving user.
804 let called_participants = room_participant::Entity::find()
805 .filter(
806 Condition::all()
807 .add(
808 room_participant::Column::CallingUserId
809 .eq(leaving_participant.user_id),
810 )
811 .add(room_participant::Column::AnsweringConnectionId.is_null()),
812 )
813 .all(&*tx)
814 .await?;
815 room_participant::Entity::delete_many()
816 .filter(
817 room_participant::Column::Id
818 .is_in(called_participants.iter().map(|participant| participant.id)),
819 )
820 .exec(&*tx)
821 .await?;
822 let canceled_calls_to_user_ids = called_participants
823 .into_iter()
824 .map(|participant| participant.user_id)
825 .collect();
826
827 // Detect left projects.
828 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
829 enum QueryProjectIds {
830 ProjectId,
831 }
832 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
833 .select_only()
834 .column_as(
835 project_collaborator::Column::ProjectId,
836 QueryProjectIds::ProjectId,
837 )
838 .filter(
839 Condition::all()
840 .add(
841 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
842 )
843 .add(
844 project_collaborator::Column::ConnectionServerId
845 .eq(connection.owner_id as i32),
846 ),
847 )
848 .into_values::<_, QueryProjectIds>()
849 .all(&*tx)
850 .await?;
851 let mut left_projects = HashMap::default();
852 let mut collaborators = project_collaborator::Entity::find()
853 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
854 .stream(&*tx)
855 .await?;
856 while let Some(collaborator) = collaborators.next().await {
857 let collaborator = collaborator?;
858 let left_project =
859 left_projects
860 .entry(collaborator.project_id)
861 .or_insert(LeftProject {
862 id: collaborator.project_id,
863 host_user_id: Default::default(),
864 connection_ids: Default::default(),
865 host_connection_id: Default::default(),
866 });
867
868 let collaborator_connection_id = collaborator.connection();
869 if collaborator_connection_id != connection {
870 left_project.connection_ids.push(collaborator_connection_id);
871 }
872
873 if collaborator.is_host {
874 left_project.host_user_id = collaborator.user_id;
875 left_project.host_connection_id = collaborator_connection_id;
876 }
877 }
878 drop(collaborators);
879
880 // Leave projects.
881 project_collaborator::Entity::delete_many()
882 .filter(
883 Condition::all()
884 .add(
885 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
886 )
887 .add(
888 project_collaborator::Column::ConnectionServerId
889 .eq(connection.owner_id as i32),
890 ),
891 )
892 .exec(&*tx)
893 .await?;
894
895 follower::Entity::delete_many()
896 .filter(
897 Condition::all()
898 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
899 )
900 .exec(&*tx)
901 .await?;
902
903 // Unshare projects.
904 project::Entity::delete_many()
905 .filter(
906 Condition::all()
907 .add(project::Column::RoomId.eq(room_id))
908 .add(project::Column::HostConnectionId.eq(connection.id as i32))
909 .add(
910 project::Column::HostConnectionServerId
911 .eq(connection.owner_id as i32),
912 ),
913 )
914 .exec(&*tx)
915 .await?;
916
917 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
918 let deleted = if room.participants.is_empty() {
919 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
920 result.rows_affected > 0
921 } else {
922 false
923 };
924
925 let channel_members = if let Some(channel) = &channel {
926 self.get_channel_participants(channel, &tx).await?
927 } else {
928 Vec::new()
929 };
930 let left_room = LeftRoom {
931 room,
932 channel_id: channel.map(|channel| channel.id),
933 channel_members,
934 left_projects,
935 canceled_calls_to_user_ids,
936 deleted,
937 };
938
939 if left_room.room.participants.is_empty() {
940 self.rooms.remove(&room_id);
941 }
942
943 Ok(Some((room_id, left_room)))
944 } else {
945 Ok(None)
946 }
947 })
948 .await
949 }
950
951 /// Updates the location of a participant in the given room.
952 pub async fn update_room_participant_location(
953 &self,
954 room_id: RoomId,
955 connection: ConnectionId,
956 location: proto::ParticipantLocation,
957 ) -> Result<RoomGuard<proto::Room>> {
958 self.room_transaction(room_id, |tx| async {
959 let tx = tx;
960 let location_kind;
961 let location_project_id;
962 match location
963 .variant
964 .as_ref()
965 .ok_or_else(|| anyhow!("invalid location"))?
966 {
967 proto::participant_location::Variant::SharedProject(project) => {
968 location_kind = 0;
969 location_project_id = Some(ProjectId::from_proto(project.id));
970 }
971 proto::participant_location::Variant::UnsharedProject(_) => {
972 location_kind = 1;
973 location_project_id = None;
974 }
975 proto::participant_location::Variant::External(_) => {
976 location_kind = 2;
977 location_project_id = None;
978 }
979 }
980
981 let result = room_participant::Entity::update_many()
982 .filter(
983 Condition::all()
984 .add(room_participant::Column::RoomId.eq(room_id))
985 .add(
986 room_participant::Column::AnsweringConnectionId
987 .eq(connection.id as i32),
988 )
989 .add(
990 room_participant::Column::AnsweringConnectionServerId
991 .eq(connection.owner_id as i32),
992 ),
993 )
994 .set(room_participant::ActiveModel {
995 location_kind: ActiveValue::set(Some(location_kind)),
996 location_project_id: ActiveValue::set(location_project_id),
997 ..Default::default()
998 })
999 .exec(&*tx)
1000 .await?;
1001
1002 if result.rows_affected == 1 {
1003 let room = self.get_room(room_id, &tx).await?;
1004 Ok(room)
1005 } else {
1006 Err(anyhow!("could not update room participant location"))?
1007 }
1008 })
1009 .await
1010 }
1011
1012 /// Sets the role of a participant in the given room.
1013 pub async fn set_room_participant_role(
1014 &self,
1015 admin_id: UserId,
1016 room_id: RoomId,
1017 user_id: UserId,
1018 role: ChannelRole,
1019 ) -> Result<RoomGuard<proto::Room>> {
1020 self.room_transaction(room_id, |tx| async move {
1021 room_participant::Entity::find()
1022 .filter(
1023 Condition::all()
1024 .add(room_participant::Column::RoomId.eq(room_id))
1025 .add(room_participant::Column::UserId.eq(admin_id))
1026 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1027 )
1028 .one(&*tx)
1029 .await?
1030 .ok_or_else(|| anyhow!("only admins can set participant role"))?;
1031
1032 if role.requires_cla() {
1033 self.check_user_has_signed_cla(user_id, room_id, &*tx)
1034 .await?;
1035 }
1036
1037 let result = room_participant::Entity::update_many()
1038 .filter(
1039 Condition::all()
1040 .add(room_participant::Column::RoomId.eq(room_id))
1041 .add(room_participant::Column::UserId.eq(user_id)),
1042 )
1043 .set(room_participant::ActiveModel {
1044 role: ActiveValue::set(Some(ChannelRole::from(role))),
1045 ..Default::default()
1046 })
1047 .exec(&*tx)
1048 .await?;
1049
1050 if result.rows_affected != 1 {
1051 Err(anyhow!("could not update room participant role"))?;
1052 }
1053 Ok(self.get_room(room_id, &tx).await?)
1054 })
1055 .await
1056 }
1057
1058 async fn check_user_has_signed_cla(
1059 &self,
1060 user_id: UserId,
1061 room_id: RoomId,
1062 tx: &DatabaseTransaction,
1063 ) -> Result<()> {
1064 let channel = room::Entity::find_by_id(room_id)
1065 .one(&*tx)
1066 .await?
1067 .ok_or_else(|| anyhow!("could not find room"))?
1068 .find_related(channel::Entity)
1069 .one(&*tx)
1070 .await?;
1071
1072 if let Some(channel) = channel {
1073 let requires_zed_cla = channel.requires_zed_cla
1074 || channel::Entity::find()
1075 .filter(
1076 channel::Column::Id
1077 .is_in(channel.ancestors())
1078 .and(channel::Column::RequiresZedCla.eq(true)),
1079 )
1080 .count(&*tx)
1081 .await?
1082 > 0;
1083 if requires_zed_cla {
1084 if contributor::Entity::find()
1085 .filter(contributor::Column::UserId.eq(user_id))
1086 .one(&*tx)
1087 .await?
1088 .is_none()
1089 {
1090 Err(anyhow!("user has not signed the Zed CLA"))?;
1091 }
1092 }
1093 }
1094 Ok(())
1095 }
1096
1097 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1098 self.transaction(|tx| async move {
1099 self.room_connection_lost(connection, &*tx).await?;
1100 self.channel_buffer_connection_lost(connection, &*tx)
1101 .await?;
1102 self.channel_chat_connection_lost(connection, &*tx).await?;
1103 Ok(())
1104 })
1105 .await
1106 }
1107
1108 pub async fn room_connection_lost(
1109 &self,
1110 connection: ConnectionId,
1111 tx: &DatabaseTransaction,
1112 ) -> Result<()> {
1113 let participant = room_participant::Entity::find()
1114 .filter(
1115 Condition::all()
1116 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1117 .add(
1118 room_participant::Column::AnsweringConnectionServerId
1119 .eq(connection.owner_id as i32),
1120 ),
1121 )
1122 .one(&*tx)
1123 .await?;
1124
1125 if let Some(participant) = participant {
1126 room_participant::Entity::update(room_participant::ActiveModel {
1127 answering_connection_lost: ActiveValue::set(true),
1128 ..participant.into_active_model()
1129 })
1130 .exec(&*tx)
1131 .await?;
1132 }
1133 Ok(())
1134 }
1135
1136 fn build_incoming_call(
1137 room: &proto::Room,
1138 called_user_id: UserId,
1139 ) -> Option<proto::IncomingCall> {
1140 let pending_participant = room
1141 .pending_participants
1142 .iter()
1143 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1144
1145 Some(proto::IncomingCall {
1146 room_id: room.id,
1147 calling_user_id: pending_participant.calling_user_id,
1148 participant_user_ids: room
1149 .participants
1150 .iter()
1151 .map(|participant| participant.user_id)
1152 .collect(),
1153 initial_project: room.participants.iter().find_map(|participant| {
1154 let initial_project_id = pending_participant.initial_project_id?;
1155 participant
1156 .projects
1157 .iter()
1158 .find(|project| project.id == initial_project_id)
1159 .cloned()
1160 }),
1161 })
1162 }
1163
1164 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1165 let (_, room) = self.get_channel_room(room_id, tx).await?;
1166 Ok(room)
1167 }
1168
1169 pub async fn room_connection_ids(
1170 &self,
1171 room_id: RoomId,
1172 connection_id: ConnectionId,
1173 ) -> Result<RoomGuard<HashSet<ConnectionId>>> {
1174 self.room_transaction(room_id, |tx| async move {
1175 let mut participants = room_participant::Entity::find()
1176 .filter(room_participant::Column::RoomId.eq(room_id))
1177 .stream(&*tx)
1178 .await?;
1179
1180 let mut is_participant = false;
1181 let mut connection_ids = HashSet::default();
1182 while let Some(participant) = participants.next().await {
1183 let participant = participant?;
1184 if let Some(answering_connection) = participant.answering_connection() {
1185 if answering_connection == connection_id {
1186 is_participant = true;
1187 } else {
1188 connection_ids.insert(answering_connection);
1189 }
1190 }
1191 }
1192
1193 if !is_participant {
1194 Err(anyhow!("not a room participant"))?;
1195 }
1196
1197 Ok(connection_ids)
1198 })
1199 .await
1200 }
1201
1202 async fn get_channel_room(
1203 &self,
1204 room_id: RoomId,
1205 tx: &DatabaseTransaction,
1206 ) -> Result<(Option<channel::Model>, proto::Room)> {
1207 let db_room = room::Entity::find_by_id(room_id)
1208 .one(tx)
1209 .await?
1210 .ok_or_else(|| anyhow!("could not find room"))?;
1211
1212 let mut db_participants = db_room
1213 .find_related(room_participant::Entity)
1214 .stream(tx)
1215 .await?;
1216 let mut participants = HashMap::default();
1217 let mut pending_participants = Vec::new();
1218 while let Some(db_participant) = db_participants.next().await {
1219 let db_participant = db_participant?;
1220 if let (
1221 Some(answering_connection_id),
1222 Some(answering_connection_server_id),
1223 Some(participant_index),
1224 ) = (
1225 db_participant.answering_connection_id,
1226 db_participant.answering_connection_server_id,
1227 db_participant.participant_index,
1228 ) {
1229 let location = match (
1230 db_participant.location_kind,
1231 db_participant.location_project_id,
1232 ) {
1233 (Some(0), Some(project_id)) => {
1234 Some(proto::participant_location::Variant::SharedProject(
1235 proto::participant_location::SharedProject {
1236 id: project_id.to_proto(),
1237 },
1238 ))
1239 }
1240 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1241 Default::default(),
1242 )),
1243 _ => Some(proto::participant_location::Variant::External(
1244 Default::default(),
1245 )),
1246 };
1247
1248 let answering_connection = ConnectionId {
1249 owner_id: answering_connection_server_id.0 as u32,
1250 id: answering_connection_id as u32,
1251 };
1252 participants.insert(
1253 answering_connection,
1254 proto::Participant {
1255 user_id: db_participant.user_id.to_proto(),
1256 peer_id: Some(answering_connection.into()),
1257 projects: Default::default(),
1258 location: Some(proto::ParticipantLocation { variant: location }),
1259 participant_index: participant_index as u32,
1260 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1261 },
1262 );
1263 } else {
1264 pending_participants.push(proto::PendingParticipant {
1265 user_id: db_participant.user_id.to_proto(),
1266 calling_user_id: db_participant.calling_user_id.to_proto(),
1267 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1268 });
1269 }
1270 }
1271 drop(db_participants);
1272
1273 let mut db_projects = db_room
1274 .find_related(project::Entity)
1275 .find_with_related(worktree::Entity)
1276 .stream(tx)
1277 .await?;
1278
1279 while let Some(row) = db_projects.next().await {
1280 let (db_project, db_worktree) = row?;
1281 let host_connection = db_project.host_connection()?;
1282 if let Some(participant) = participants.get_mut(&host_connection) {
1283 let project = if let Some(project) = participant
1284 .projects
1285 .iter_mut()
1286 .find(|project| project.id == db_project.id.to_proto())
1287 {
1288 project
1289 } else {
1290 participant.projects.push(proto::ParticipantProject {
1291 id: db_project.id.to_proto(),
1292 worktree_root_names: Default::default(),
1293 });
1294 participant.projects.last_mut().unwrap()
1295 };
1296
1297 if let Some(db_worktree) = db_worktree {
1298 if db_worktree.visible {
1299 project.worktree_root_names.push(db_worktree.root_name);
1300 }
1301 }
1302 }
1303 }
1304 drop(db_projects);
1305
1306 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1307 let mut followers = Vec::new();
1308 while let Some(db_follower) = db_followers.next().await {
1309 let db_follower = db_follower?;
1310 followers.push(proto::Follower {
1311 leader_id: Some(db_follower.leader_connection().into()),
1312 follower_id: Some(db_follower.follower_connection().into()),
1313 project_id: db_follower.project_id.to_proto(),
1314 });
1315 }
1316 drop(db_followers);
1317
1318 let channel = if let Some(channel_id) = db_room.channel_id {
1319 Some(self.get_channel_internal(channel_id, &*tx).await?)
1320 } else {
1321 None
1322 };
1323
1324 Ok((
1325 channel,
1326 proto::Room {
1327 id: db_room.id.to_proto(),
1328 live_kit_room: db_room.live_kit_room,
1329 participants: participants.into_values().collect(),
1330 pending_participants,
1331 followers,
1332 },
1333 ))
1334 }
1335}