1use anyhow::Context as _;
2
3use super::*;
4
5impl Database {
6 /// Clears all room participants in rooms attached to a stale server.
7 pub async fn clear_stale_room_participants(
8 &self,
9 room_id: RoomId,
10 new_server_id: ServerId,
11 ) -> Result<TransactionGuard<RefreshedRoom>> {
12 self.room_transaction(room_id, |tx| async move {
13 let stale_participant_filter = Condition::all()
14 .add(room_participant::Column::RoomId.eq(room_id))
15 .add(room_participant::Column::AnsweringConnectionId.is_not_null())
16 .add(room_participant::Column::AnsweringConnectionServerId.ne(new_server_id));
17
18 let stale_participant_user_ids = room_participant::Entity::find()
19 .filter(stale_participant_filter.clone())
20 .all(&*tx)
21 .await?
22 .into_iter()
23 .map(|participant| participant.user_id)
24 .collect::<Vec<_>>();
25
26 // Delete participants who failed to reconnect and cancel their calls.
27 let mut canceled_calls_to_user_ids = Vec::new();
28 room_participant::Entity::delete_many()
29 .filter(stale_participant_filter)
30 .exec(&*tx)
31 .await?;
32 let called_participants = room_participant::Entity::find()
33 .filter(
34 Condition::all()
35 .add(
36 room_participant::Column::CallingUserId
37 .is_in(stale_participant_user_ids.iter().copied()),
38 )
39 .add(room_participant::Column::AnsweringConnectionId.is_null()),
40 )
41 .all(&*tx)
42 .await?;
43 room_participant::Entity::delete_many()
44 .filter(
45 room_participant::Column::Id
46 .is_in(called_participants.iter().map(|participant| participant.id)),
47 )
48 .exec(&*tx)
49 .await?;
50 canceled_calls_to_user_ids.extend(
51 called_participants
52 .into_iter()
53 .map(|participant| participant.user_id),
54 );
55
56 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
57 if channel.is_none() {
58 // Delete the room if it becomes empty.
59 if room.participants.is_empty() {
60 project::Entity::delete_many()
61 .filter(project::Column::RoomId.eq(room_id))
62 .exec(&*tx)
63 .await?;
64 room::Entity::delete_by_id(room_id).exec(&*tx).await?;
65 }
66 };
67
68 Ok(RefreshedRoom {
69 room,
70 channel,
71 stale_participant_user_ids,
72 canceled_calls_to_user_ids,
73 })
74 })
75 .await
76 }
77
78 /// Returns the incoming calls for user with the given ID.
79 pub async fn incoming_call_for_user(
80 &self,
81 user_id: UserId,
82 ) -> Result<Option<proto::IncomingCall>> {
83 self.transaction(|tx| async move {
84 let pending_participant = room_participant::Entity::find()
85 .filter(
86 room_participant::Column::UserId
87 .eq(user_id)
88 .and(room_participant::Column::AnsweringConnectionId.is_null()),
89 )
90 .one(&*tx)
91 .await?;
92
93 if let Some(pending_participant) = pending_participant {
94 let room = self.get_room(pending_participant.room_id, &tx).await?;
95 Ok(Self::build_incoming_call(&room, user_id))
96 } else {
97 Ok(None)
98 }
99 })
100 .await
101 }
102
103 /// Creates a new room.
104 pub async fn create_room(
105 &self,
106 user_id: UserId,
107 connection: ConnectionId,
108 livekit_room: &str,
109 ) -> Result<proto::Room> {
110 self.transaction(|tx| async move {
111 let room = room::ActiveModel {
112 live_kit_room: ActiveValue::set(livekit_room.into()),
113 ..Default::default()
114 }
115 .insert(&*tx)
116 .await?;
117 room_participant::ActiveModel {
118 room_id: ActiveValue::set(room.id),
119 user_id: ActiveValue::set(user_id),
120 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
121 answering_connection_server_id: ActiveValue::set(Some(ServerId(
122 connection.owner_id as i32,
123 ))),
124 answering_connection_lost: ActiveValue::set(false),
125 calling_user_id: ActiveValue::set(user_id),
126 calling_connection_id: ActiveValue::set(connection.id as i32),
127 calling_connection_server_id: ActiveValue::set(Some(ServerId(
128 connection.owner_id as i32,
129 ))),
130 participant_index: ActiveValue::set(Some(0)),
131 role: ActiveValue::set(Some(ChannelRole::Admin)),
132
133 id: ActiveValue::NotSet,
134 location_kind: ActiveValue::NotSet,
135 location_project_id: ActiveValue::NotSet,
136 initial_project_id: ActiveValue::NotSet,
137 }
138 .insert(&*tx)
139 .await?;
140
141 let room = self.get_room(room.id, &tx).await?;
142 Ok(room)
143 })
144 .await
145 }
146
147 pub async fn call(
148 &self,
149 room_id: RoomId,
150 calling_user_id: UserId,
151 calling_connection: ConnectionId,
152 called_user_id: UserId,
153 initial_project_id: Option<ProjectId>,
154 ) -> Result<TransactionGuard<(proto::Room, proto::IncomingCall)>> {
155 self.room_transaction(room_id, |tx| async move {
156 let caller = room_participant::Entity::find()
157 .filter(
158 room_participant::Column::UserId
159 .eq(calling_user_id)
160 .and(room_participant::Column::RoomId.eq(room_id)),
161 )
162 .one(&*tx)
163 .await?
164 .context("user is not in the room")?;
165
166 let called_user_role = match caller.role.unwrap_or(ChannelRole::Member) {
167 ChannelRole::Admin | ChannelRole::Member => ChannelRole::Member,
168 ChannelRole::Guest | ChannelRole::Talker => ChannelRole::Guest,
169 ChannelRole::Banned => return Err(anyhow!("banned users cannot invite").into()),
170 };
171
172 room_participant::ActiveModel {
173 room_id: ActiveValue::set(room_id),
174 user_id: ActiveValue::set(called_user_id),
175 answering_connection_lost: ActiveValue::set(false),
176 participant_index: ActiveValue::NotSet,
177 calling_user_id: ActiveValue::set(calling_user_id),
178 calling_connection_id: ActiveValue::set(calling_connection.id as i32),
179 calling_connection_server_id: ActiveValue::set(Some(ServerId(
180 calling_connection.owner_id as i32,
181 ))),
182 initial_project_id: ActiveValue::set(initial_project_id),
183 role: ActiveValue::set(Some(called_user_role)),
184
185 id: ActiveValue::NotSet,
186 answering_connection_id: ActiveValue::NotSet,
187 answering_connection_server_id: ActiveValue::NotSet,
188 location_kind: ActiveValue::NotSet,
189 location_project_id: ActiveValue::NotSet,
190 }
191 .insert(&*tx)
192 .await?;
193
194 let room = self.get_room(room_id, &tx).await?;
195 let incoming_call = Self::build_incoming_call(&room, called_user_id)
196 .context("failed to build incoming call")?;
197 Ok((room, incoming_call))
198 })
199 .await
200 }
201
202 pub async fn call_failed(
203 &self,
204 room_id: RoomId,
205 called_user_id: UserId,
206 ) -> Result<TransactionGuard<proto::Room>> {
207 self.room_transaction(room_id, |tx| async move {
208 room_participant::Entity::delete_many()
209 .filter(
210 room_participant::Column::RoomId
211 .eq(room_id)
212 .and(room_participant::Column::UserId.eq(called_user_id)),
213 )
214 .exec(&*tx)
215 .await?;
216 let room = self.get_room(room_id, &tx).await?;
217 Ok(room)
218 })
219 .await
220 }
221
222 pub async fn decline_call(
223 &self,
224 expected_room_id: Option<RoomId>,
225 user_id: UserId,
226 ) -> Result<Option<TransactionGuard<proto::Room>>> {
227 self.optional_room_transaction(|tx| async move {
228 let mut filter = Condition::all()
229 .add(room_participant::Column::UserId.eq(user_id))
230 .add(room_participant::Column::AnsweringConnectionId.is_null());
231 if let Some(room_id) = expected_room_id {
232 filter = filter.add(room_participant::Column::RoomId.eq(room_id));
233 }
234 let participant = room_participant::Entity::find()
235 .filter(filter)
236 .one(&*tx)
237 .await?;
238
239 let participant = if let Some(participant) = participant {
240 participant
241 } else if expected_room_id.is_some() {
242 return Err(anyhow!("could not find call to decline"))?;
243 } else {
244 return Ok(None);
245 };
246
247 let room_id = participant.room_id;
248 room_participant::Entity::delete(participant.into_active_model())
249 .exec(&*tx)
250 .await?;
251
252 let room = self.get_room(room_id, &tx).await?;
253 Ok(Some((room_id, room)))
254 })
255 .await
256 }
257
258 pub async fn cancel_call(
259 &self,
260 room_id: RoomId,
261 calling_connection: ConnectionId,
262 called_user_id: UserId,
263 ) -> Result<TransactionGuard<proto::Room>> {
264 self.room_transaction(room_id, |tx| async move {
265 let participant = room_participant::Entity::find()
266 .filter(
267 Condition::all()
268 .add(room_participant::Column::UserId.eq(called_user_id))
269 .add(room_participant::Column::RoomId.eq(room_id))
270 .add(
271 room_participant::Column::CallingConnectionId
272 .eq(calling_connection.id as i32),
273 )
274 .add(
275 room_participant::Column::CallingConnectionServerId
276 .eq(calling_connection.owner_id as i32),
277 )
278 .add(room_participant::Column::AnsweringConnectionId.is_null()),
279 )
280 .one(&*tx)
281 .await?
282 .context("no call to cancel")?;
283
284 room_participant::Entity::delete(participant.into_active_model())
285 .exec(&*tx)
286 .await?;
287
288 let room = self.get_room(room_id, &tx).await?;
289 Ok(room)
290 })
291 .await
292 }
293
294 pub async fn join_room(
295 &self,
296 room_id: RoomId,
297 user_id: UserId,
298 connection: ConnectionId,
299 ) -> Result<TransactionGuard<JoinRoom>> {
300 self.room_transaction(room_id, |tx| async move {
301 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
302 enum QueryChannelId {
303 ChannelId,
304 }
305
306 let channel_id: Option<ChannelId> = room::Entity::find()
307 .select_only()
308 .column(room::Column::ChannelId)
309 .filter(room::Column::Id.eq(room_id))
310 .into_values::<_, QueryChannelId>()
311 .one(&*tx)
312 .await?
313 .context("no such room")?;
314
315 if channel_id.is_some() {
316 Err(anyhow!("tried to join channel call directly"))?
317 }
318
319 let participant_index = self
320 .get_next_participant_index_internal(room_id, &tx)
321 .await?;
322
323 let result = room_participant::Entity::update_many()
324 .filter(
325 Condition::all()
326 .add(room_participant::Column::RoomId.eq(room_id))
327 .add(room_participant::Column::UserId.eq(user_id))
328 .add(room_participant::Column::AnsweringConnectionId.is_null()),
329 )
330 .set(room_participant::ActiveModel {
331 participant_index: ActiveValue::Set(Some(participant_index)),
332 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
333 answering_connection_server_id: ActiveValue::set(Some(ServerId(
334 connection.owner_id as i32,
335 ))),
336 answering_connection_lost: ActiveValue::set(false),
337 ..Default::default()
338 })
339 .exec(&*tx)
340 .await?;
341 if result.rows_affected == 0 {
342 Err(anyhow!("room does not exist or was already joined"))?;
343 }
344
345 let room = self.get_room(room_id, &tx).await?;
346 Ok(JoinRoom {
347 room,
348 channel: None,
349 })
350 })
351 .await
352 }
353
354 pub async fn stale_room_connection(&self, user_id: UserId) -> Result<Option<ConnectionId>> {
355 self.transaction(|tx| async move {
356 let participant = room_participant::Entity::find()
357 .filter(room_participant::Column::UserId.eq(user_id))
358 .one(&*tx)
359 .await?;
360 Ok(participant.and_then(|p| p.answering_connection()))
361 })
362 .await
363 }
364
365 async fn get_next_participant_index_internal(
366 &self,
367 room_id: RoomId,
368 tx: &DatabaseTransaction,
369 ) -> Result<i32> {
370 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
371 enum QueryParticipantIndices {
372 ParticipantIndex,
373 }
374 let existing_participant_indices: Vec<i32> = room_participant::Entity::find()
375 .filter(
376 room_participant::Column::RoomId
377 .eq(room_id)
378 .and(room_participant::Column::ParticipantIndex.is_not_null()),
379 )
380 .select_only()
381 .column(room_participant::Column::ParticipantIndex)
382 .into_values::<_, QueryParticipantIndices>()
383 .all(tx)
384 .await?;
385
386 let mut participant_index = 0;
387 while existing_participant_indices.contains(&participant_index) {
388 participant_index += 1;
389 }
390
391 Ok(participant_index)
392 }
393
394 /// Returns the channel ID for the given room, if it has one.
395 pub async fn channel_id_for_room(&self, room_id: RoomId) -> Result<Option<ChannelId>> {
396 self.transaction(|tx| async move {
397 let room: Option<room::Model> = room::Entity::find()
398 .filter(room::Column::Id.eq(room_id))
399 .one(&*tx)
400 .await?;
401
402 Ok(room.and_then(|room| room.channel_id))
403 })
404 .await
405 }
406
407 pub(crate) async fn join_channel_room_internal(
408 &self,
409 room_id: RoomId,
410 user_id: UserId,
411 connection: ConnectionId,
412 role: ChannelRole,
413 tx: &DatabaseTransaction,
414 ) -> Result<JoinRoom> {
415 let participant_index = self
416 .get_next_participant_index_internal(room_id, tx)
417 .await?;
418
419 // If someone has been invited into the room, accept the invite instead of inserting
420 let result = room_participant::Entity::update_many()
421 .filter(
422 Condition::all()
423 .add(room_participant::Column::RoomId.eq(room_id))
424 .add(room_participant::Column::UserId.eq(user_id))
425 .add(room_participant::Column::AnsweringConnectionId.is_null()),
426 )
427 .set(room_participant::ActiveModel {
428 participant_index: ActiveValue::Set(Some(participant_index)),
429 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
430 answering_connection_server_id: ActiveValue::set(Some(ServerId(
431 connection.owner_id as i32,
432 ))),
433 answering_connection_lost: ActiveValue::set(false),
434 ..Default::default()
435 })
436 .exec(tx)
437 .await?;
438
439 if result.rows_affected == 0 {
440 room_participant::Entity::insert(room_participant::ActiveModel {
441 room_id: ActiveValue::set(room_id),
442 user_id: ActiveValue::set(user_id),
443 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
444 answering_connection_server_id: ActiveValue::set(Some(ServerId(
445 connection.owner_id as i32,
446 ))),
447 answering_connection_lost: ActiveValue::set(false),
448 calling_user_id: ActiveValue::set(user_id),
449 calling_connection_id: ActiveValue::set(connection.id as i32),
450 calling_connection_server_id: ActiveValue::set(Some(ServerId(
451 connection.owner_id as i32,
452 ))),
453 participant_index: ActiveValue::Set(Some(participant_index)),
454 role: ActiveValue::set(Some(role)),
455 id: ActiveValue::NotSet,
456 location_kind: ActiveValue::NotSet,
457 location_project_id: ActiveValue::NotSet,
458 initial_project_id: ActiveValue::NotSet,
459 })
460 .exec(tx)
461 .await?;
462 }
463
464 let (channel, room) = self.get_channel_room(room_id, tx).await?;
465 let channel = channel.context("no channel for room")?;
466 Ok(JoinRoom {
467 room,
468 channel: Some(channel),
469 })
470 }
471
472 pub async fn rejoin_room(
473 &self,
474 rejoin_room: proto::RejoinRoom,
475 user_id: UserId,
476 connection: ConnectionId,
477 ) -> Result<TransactionGuard<RejoinedRoom>> {
478 let room_id = RoomId::from_proto(rejoin_room.id);
479 self.room_transaction(room_id, |tx| async {
480 let tx = tx;
481 let participant_update = room_participant::Entity::update_many()
482 .filter(
483 Condition::all()
484 .add(room_participant::Column::RoomId.eq(room_id))
485 .add(room_participant::Column::UserId.eq(user_id))
486 .add(room_participant::Column::AnsweringConnectionId.is_not_null()),
487 )
488 .set(room_participant::ActiveModel {
489 answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
490 answering_connection_server_id: ActiveValue::set(Some(ServerId(
491 connection.owner_id as i32,
492 ))),
493 answering_connection_lost: ActiveValue::set(false),
494 ..Default::default()
495 })
496 .exec(&*tx)
497 .await?;
498 if participant_update.rows_affected == 0 {
499 return Err(anyhow!("room does not exist or was already joined"))?;
500 }
501
502 let mut reshared_projects = Vec::new();
503 for reshared_project in &rejoin_room.reshared_projects {
504 let project_id = ProjectId::from_proto(reshared_project.project_id);
505 let project = project::Entity::find_by_id(project_id)
506 .one(&*tx)
507 .await?
508 .context("project does not exist")?;
509 if project.host_user_id != Some(user_id) {
510 return Err(anyhow!("no such project"))?;
511 }
512
513 let mut collaborators = project
514 .find_related(project_collaborator::Entity)
515 .all(&*tx)
516 .await?;
517 let host_ix = collaborators
518 .iter()
519 .position(|collaborator| {
520 collaborator.user_id == user_id && collaborator.is_host
521 })
522 .context("host not found among collaborators")?;
523 let host = collaborators.swap_remove(host_ix);
524 let old_connection_id = host.connection();
525
526 project::Entity::update(project::ActiveModel {
527 host_connection_id: ActiveValue::set(Some(connection.id as i32)),
528 host_connection_server_id: ActiveValue::set(Some(ServerId(
529 connection.owner_id as i32,
530 ))),
531 ..project.into_active_model()
532 })
533 .exec(&*tx)
534 .await?;
535 project_collaborator::Entity::update(project_collaborator::ActiveModel {
536 connection_id: ActiveValue::set(connection.id as i32),
537 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
538 ..host.into_active_model()
539 })
540 .exec(&*tx)
541 .await?;
542
543 self.update_project_worktrees(project_id, &reshared_project.worktrees, &tx)
544 .await?;
545
546 reshared_projects.push(ResharedProject {
547 id: project_id,
548 old_connection_id,
549 collaborators: collaborators
550 .iter()
551 .map(|collaborator| ProjectCollaborator {
552 connection_id: collaborator.connection(),
553 user_id: collaborator.user_id,
554 replica_id: collaborator.replica_id,
555 is_host: collaborator.is_host,
556 committer_name: collaborator.committer_name.clone(),
557 committer_email: collaborator.committer_email.clone(),
558 })
559 .collect(),
560 worktrees: reshared_project.worktrees.clone(),
561 });
562 }
563
564 project::Entity::delete_many()
565 .filter(
566 Condition::all()
567 .add(project::Column::RoomId.eq(room_id))
568 .add(project::Column::HostUserId.eq(user_id))
569 .add(
570 project::Column::Id
571 .is_not_in(reshared_projects.iter().map(|project| project.id)),
572 ),
573 )
574 .exec(&*tx)
575 .await?;
576
577 let mut rejoined_projects = Vec::new();
578 for rejoined_project in &rejoin_room.rejoined_projects {
579 if let Some(rejoined_project) = self
580 .rejoin_project_internal(&tx, rejoined_project, user_id, connection)
581 .await?
582 {
583 rejoined_projects.push(rejoined_project);
584 }
585 }
586
587 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
588
589 Ok(RejoinedRoom {
590 room,
591 channel,
592 rejoined_projects,
593 reshared_projects,
594 })
595 })
596 .await
597 }
598
599 pub async fn rejoin_project_internal(
600 &self,
601 tx: &DatabaseTransaction,
602 rejoined_project: &proto::RejoinProject,
603 user_id: UserId,
604 connection: ConnectionId,
605 ) -> Result<Option<RejoinedProject>> {
606 let project_id = ProjectId::from_proto(rejoined_project.id);
607 let Some(project) = project::Entity::find_by_id(project_id).one(tx).await? else {
608 return Ok(None);
609 };
610
611 let mut worktrees = Vec::new();
612 let db_worktrees = project.find_related(worktree::Entity).all(tx).await?;
613 let db_repos = project
614 .find_related(project_repository::Entity)
615 .all(tx)
616 .await?;
617
618 for db_worktree in db_worktrees {
619 let mut worktree = RejoinedWorktree {
620 id: db_worktree.id as u64,
621 abs_path: db_worktree.abs_path,
622 root_name: db_worktree.root_name,
623 visible: db_worktree.visible,
624 updated_entries: Default::default(),
625 removed_entries: Default::default(),
626 updated_repositories: Default::default(),
627 removed_repositories: Default::default(),
628 diagnostic_summaries: Default::default(),
629 settings_files: Default::default(),
630 scan_id: db_worktree.scan_id as u64,
631 completed_scan_id: db_worktree.completed_scan_id as u64,
632 root_repo_common_dir: db_worktree.root_repo_common_dir,
633 };
634
635 let rejoined_worktree = rejoined_project
636 .worktrees
637 .iter()
638 .find(|worktree| worktree.id == db_worktree.id as u64);
639
640 // File entries
641 {
642 let entry_filter = if let Some(rejoined_worktree) = rejoined_worktree {
643 worktree_entry::Column::ScanId.gt(rejoined_worktree.scan_id)
644 } else {
645 worktree_entry::Column::IsDeleted.eq(false)
646 };
647
648 let mut db_entries = worktree_entry::Entity::find()
649 .filter(
650 Condition::all()
651 .add(worktree_entry::Column::ProjectId.eq(project.id))
652 .add(worktree_entry::Column::WorktreeId.eq(worktree.id))
653 .add(entry_filter),
654 )
655 .stream(tx)
656 .await?;
657
658 while let Some(db_entry) = db_entries.next().await {
659 let db_entry = db_entry?;
660 if db_entry.is_deleted {
661 worktree.removed_entries.push(db_entry.id as u64);
662 } else {
663 worktree.updated_entries.push(proto::Entry {
664 id: db_entry.id as u64,
665 is_dir: db_entry.is_dir,
666 path: db_entry.path,
667 inode: db_entry.inode as u64,
668 mtime: Some(proto::Timestamp {
669 seconds: db_entry.mtime_seconds as u64,
670 nanos: db_entry.mtime_nanos as u32,
671 }),
672 canonical_path: db_entry.canonical_path,
673 is_ignored: db_entry.is_ignored,
674 is_external: db_entry.is_external,
675 is_hidden: db_entry.is_hidden,
676 // This is only used in the summarization backlog, so if it's None,
677 // that just means we won't be able to detect when to resummarize
678 // based on total number of backlogged bytes - instead, we'd go
679 // on number of files only. That shouldn't be a huge deal in practice.
680 size: None,
681 is_fifo: db_entry.is_fifo,
682 });
683 }
684 }
685 }
686
687 worktrees.push(worktree);
688 }
689
690 let mut removed_repositories = Vec::new();
691 let mut updated_repositories = Vec::new();
692 for db_repo in db_repos {
693 let rejoined_repository = rejoined_project
694 .repositories
695 .iter()
696 .find(|repo| repo.id == db_repo.id as u64);
697
698 let repository_filter = if let Some(rejoined_repository) = rejoined_repository {
699 project_repository::Column::ScanId.gt(rejoined_repository.scan_id)
700 } else {
701 project_repository::Column::IsDeleted.eq(false)
702 };
703
704 let db_repositories = project_repository::Entity::find()
705 .filter(
706 Condition::all()
707 .add(project_repository::Column::ProjectId.eq(project.id))
708 .add(repository_filter),
709 )
710 .all(tx)
711 .await?;
712
713 for db_repository in db_repositories.into_iter() {
714 if db_repository.is_deleted {
715 removed_repositories.push(db_repository.id as u64);
716 } else {
717 let status_entry_filter = if let Some(rejoined_repository) = rejoined_repository
718 {
719 project_repository_statuses::Column::ScanId.gt(rejoined_repository.scan_id)
720 } else {
721 project_repository_statuses::Column::IsDeleted.eq(false)
722 };
723
724 let mut db_statuses = project_repository_statuses::Entity::find()
725 .filter(
726 Condition::all()
727 .add(project_repository_statuses::Column::ProjectId.eq(project.id))
728 .add(
729 project_repository_statuses::Column::RepositoryId
730 .eq(db_repository.id),
731 )
732 .add(status_entry_filter),
733 )
734 .stream(tx)
735 .await?;
736 let mut removed_statuses = Vec::new();
737 let mut updated_statuses = Vec::new();
738
739 while let Some(db_status) = db_statuses.next().await {
740 let db_status: project_repository_statuses::Model = db_status?;
741 if db_status.is_deleted {
742 removed_statuses.push(db_status.repo_path.clone());
743 } else {
744 updated_statuses.push(db_status_to_proto(db_status)?);
745 }
746 }
747
748 let current_merge_conflicts = db_repository
749 .current_merge_conflicts
750 .as_ref()
751 .map(|conflicts| serde_json::from_str(conflicts))
752 .transpose()?
753 .unwrap_or_default();
754
755 let branch_summary = db_repository
756 .branch_summary
757 .as_ref()
758 .map(|branch_summary| serde_json::from_str(branch_summary))
759 .transpose()?
760 .unwrap_or_default();
761
762 let head_commit_details = db_repository
763 .head_commit_details
764 .as_ref()
765 .map(|head_commit_details| serde_json::from_str(head_commit_details))
766 .transpose()?
767 .unwrap_or_default();
768
769 let entry_ids = serde_json::from_str(&db_repository.entry_ids)
770 .context("failed to deserialize repository's entry ids")?;
771
772 if let Some(legacy_worktree_id) = db_repository.legacy_worktree_id {
773 if let Some(worktree) = worktrees
774 .iter_mut()
775 .find(|worktree| worktree.id as i64 == legacy_worktree_id)
776 {
777 worktree.updated_repositories.push(proto::RepositoryEntry {
778 repository_id: db_repository.id as u64,
779 updated_statuses,
780 removed_statuses,
781 current_merge_conflicts,
782 branch_summary,
783 });
784 }
785 } else {
786 updated_repositories.push(proto::UpdateRepository {
787 entry_ids,
788 updated_statuses,
789 removed_statuses,
790 current_merge_conflicts,
791 branch_summary,
792 head_commit_details,
793 branch_list: Vec::new(),
794 project_id: project_id.to_proto(),
795 id: db_repository.id as u64,
796 abs_path: db_repository.abs_path.clone(),
797 scan_id: db_repository.scan_id as u64,
798 is_last_update: true,
799 merge_message: db_repository.merge_message,
800 stash_entries: Vec::new(),
801 remote_upstream_url: db_repository.remote_upstream_url.clone(),
802 remote_origin_url: db_repository.remote_origin_url.clone(),
803 original_repo_abs_path: Some(db_repository.abs_path),
804 linked_worktrees: db_repository
805 .linked_worktrees
806 .as_deref()
807 .and_then(|s| serde_json::from_str(s).ok())
808 .unwrap_or_default(),
809 });
810 }
811 }
812 }
813 }
814
815 let language_servers = project
816 .find_related(language_server::Entity)
817 .all(tx)
818 .await?
819 .into_iter()
820 .map(|language_server| LanguageServer {
821 server: proto::LanguageServer {
822 id: language_server.id as u64,
823 name: language_server.name,
824 worktree_id: language_server.worktree_id.map(|id| id as u64),
825 },
826 capabilities: language_server.capabilities,
827 })
828 .collect::<Vec<_>>();
829
830 {
831 let mut db_settings_files = worktree_settings_file::Entity::find()
832 .filter(worktree_settings_file::Column::ProjectId.eq(project_id))
833 .stream(tx)
834 .await?;
835 while let Some(db_settings_file) = db_settings_files.next().await {
836 let db_settings_file = db_settings_file?;
837 if let Some(worktree) = worktrees
838 .iter_mut()
839 .find(|w| w.id == db_settings_file.worktree_id as u64)
840 {
841 worktree.settings_files.push(WorktreeSettingsFile {
842 path: db_settings_file.path,
843 content: db_settings_file.content,
844 kind: db_settings_file.kind,
845 outside_worktree: db_settings_file.outside_worktree,
846 });
847 }
848 }
849 }
850
851 let mut collaborators = project
852 .find_related(project_collaborator::Entity)
853 .all(tx)
854 .await?;
855 let self_collaborator = if let Some(self_collaborator_ix) = collaborators
856 .iter()
857 .position(|collaborator| collaborator.user_id == user_id)
858 {
859 collaborators.swap_remove(self_collaborator_ix)
860 } else {
861 return Ok(None);
862 };
863 let old_connection_id = self_collaborator.connection();
864 project_collaborator::Entity::update(project_collaborator::ActiveModel {
865 connection_id: ActiveValue::set(connection.id as i32),
866 connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
867 ..self_collaborator.into_active_model()
868 })
869 .exec(tx)
870 .await?;
871
872 let collaborators = collaborators
873 .into_iter()
874 .map(|collaborator| ProjectCollaborator {
875 connection_id: collaborator.connection(),
876 user_id: collaborator.user_id,
877 replica_id: collaborator.replica_id,
878 is_host: collaborator.is_host,
879 committer_name: collaborator.committer_name,
880 committer_email: collaborator.committer_email,
881 })
882 .collect::<Vec<_>>();
883
884 Ok(Some(RejoinedProject {
885 id: project_id,
886 old_connection_id,
887 collaborators,
888 updated_repositories,
889 removed_repositories,
890 worktrees,
891 language_servers,
892 }))
893 }
894
895 pub async fn leave_room(
896 &self,
897 connection: ConnectionId,
898 ) -> Result<Option<TransactionGuard<LeftRoom>>> {
899 self.optional_room_transaction(|tx| async move {
900 let leaving_participant = room_participant::Entity::find()
901 .filter(
902 Condition::all()
903 .add(
904 room_participant::Column::AnsweringConnectionId
905 .eq(connection.id as i32),
906 )
907 .add(
908 room_participant::Column::AnsweringConnectionServerId
909 .eq(connection.owner_id as i32),
910 ),
911 )
912 .one(&*tx)
913 .await?;
914
915 if let Some(leaving_participant) = leaving_participant {
916 // Leave room.
917 let room_id = leaving_participant.room_id;
918 room_participant::Entity::delete_by_id(leaving_participant.id)
919 .exec(&*tx)
920 .await?;
921
922 // Cancel pending calls initiated by the leaving user.
923 let called_participants = room_participant::Entity::find()
924 .filter(
925 Condition::all()
926 .add(
927 room_participant::Column::CallingUserId
928 .eq(leaving_participant.user_id),
929 )
930 .add(room_participant::Column::AnsweringConnectionId.is_null()),
931 )
932 .all(&*tx)
933 .await?;
934 room_participant::Entity::delete_many()
935 .filter(
936 room_participant::Column::Id
937 .is_in(called_participants.iter().map(|participant| participant.id)),
938 )
939 .exec(&*tx)
940 .await?;
941 let canceled_calls_to_user_ids = called_participants
942 .into_iter()
943 .map(|participant| participant.user_id)
944 .collect();
945
946 // Detect left projects.
947 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
948 enum QueryProjectIds {
949 ProjectId,
950 }
951 let project_ids: Vec<ProjectId> = project_collaborator::Entity::find()
952 .select_only()
953 .column_as(
954 project_collaborator::Column::ProjectId,
955 QueryProjectIds::ProjectId,
956 )
957 .filter(
958 Condition::all()
959 .add(
960 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
961 )
962 .add(
963 project_collaborator::Column::ConnectionServerId
964 .eq(connection.owner_id as i32),
965 ),
966 )
967 .into_values::<_, QueryProjectIds>()
968 .all(&*tx)
969 .await?;
970
971 let mut left_projects = HashMap::default();
972 let mut collaborators = project_collaborator::Entity::find()
973 .filter(project_collaborator::Column::ProjectId.is_in(project_ids))
974 .stream(&*tx)
975 .await?;
976
977 while let Some(collaborator) = collaborators.next().await {
978 let collaborator = collaborator?;
979 let left_project =
980 left_projects
981 .entry(collaborator.project_id)
982 .or_insert(LeftProject {
983 id: collaborator.project_id,
984 connection_ids: Default::default(),
985 should_unshare: false,
986 });
987
988 let collaborator_connection_id = collaborator.connection();
989 if collaborator_connection_id != connection {
990 left_project.connection_ids.push(collaborator_connection_id);
991 }
992
993 if collaborator.is_host && collaborator.connection() == connection {
994 left_project.should_unshare = true;
995 }
996 }
997 drop(collaborators);
998
999 // Leave projects.
1000 project_collaborator::Entity::delete_many()
1001 .filter(
1002 Condition::all()
1003 .add(
1004 project_collaborator::Column::ConnectionId.eq(connection.id as i32),
1005 )
1006 .add(
1007 project_collaborator::Column::ConnectionServerId
1008 .eq(connection.owner_id as i32),
1009 ),
1010 )
1011 .exec(&*tx)
1012 .await?;
1013
1014 follower::Entity::delete_many()
1015 .filter(
1016 Condition::all()
1017 .add(follower::Column::FollowerConnectionId.eq(connection.id as i32)),
1018 )
1019 .exec(&*tx)
1020 .await?;
1021
1022 // Unshare projects.
1023 project::Entity::delete_many()
1024 .filter(
1025 Condition::all()
1026 .add(project::Column::RoomId.eq(room_id))
1027 .add(project::Column::HostConnectionId.eq(connection.id as i32))
1028 .add(
1029 project::Column::HostConnectionServerId
1030 .eq(connection.owner_id as i32),
1031 ),
1032 )
1033 .exec(&*tx)
1034 .await?;
1035
1036 let (channel, room) = self.get_channel_room(room_id, &tx).await?;
1037 let deleted = if room.participants.is_empty() {
1038 let result = room::Entity::delete_by_id(room_id).exec(&*tx).await?;
1039 result.rows_affected > 0
1040 } else {
1041 false
1042 };
1043
1044 let left_room = LeftRoom {
1045 room,
1046 channel,
1047 left_projects,
1048 canceled_calls_to_user_ids,
1049 deleted,
1050 };
1051
1052 if left_room.room.participants.is_empty() {
1053 self.rooms.remove(&room_id);
1054 }
1055
1056 Ok(Some((room_id, left_room)))
1057 } else {
1058 Ok(None)
1059 }
1060 })
1061 .await
1062 }
1063
1064 /// Updates the location of a participant in the given room.
1065 pub async fn update_room_participant_location(
1066 &self,
1067 room_id: RoomId,
1068 connection: ConnectionId,
1069 location: proto::ParticipantLocation,
1070 ) -> Result<TransactionGuard<proto::Room>> {
1071 self.room_transaction(room_id, |tx| async {
1072 let tx = tx;
1073 let location_kind;
1074 let location_project_id;
1075 match location.variant.as_ref().context("invalid location")? {
1076 proto::participant_location::Variant::SharedProject(project) => {
1077 location_kind = 0;
1078 location_project_id = Some(ProjectId::from_proto(project.id));
1079 }
1080 proto::participant_location::Variant::UnsharedProject(_) => {
1081 location_kind = 1;
1082 location_project_id = None;
1083 }
1084 proto::participant_location::Variant::External(_) => {
1085 location_kind = 2;
1086 location_project_id = None;
1087 }
1088 }
1089
1090 let result = room_participant::Entity::update_many()
1091 .filter(
1092 Condition::all()
1093 .add(room_participant::Column::RoomId.eq(room_id))
1094 .add(
1095 room_participant::Column::AnsweringConnectionId
1096 .eq(connection.id as i32),
1097 )
1098 .add(
1099 room_participant::Column::AnsweringConnectionServerId
1100 .eq(connection.owner_id as i32),
1101 ),
1102 )
1103 .set(room_participant::ActiveModel {
1104 location_kind: ActiveValue::set(Some(location_kind)),
1105 location_project_id: ActiveValue::set(location_project_id),
1106 ..Default::default()
1107 })
1108 .exec(&*tx)
1109 .await?;
1110
1111 if result.rows_affected == 1 {
1112 let room = self.get_room(room_id, &tx).await?;
1113 Ok(room)
1114 } else {
1115 Err(anyhow!("could not update room participant location"))?
1116 }
1117 })
1118 .await
1119 }
1120
1121 /// Sets the role of a participant in the given room.
1122 pub async fn set_room_participant_role(
1123 &self,
1124 admin_id: UserId,
1125 room_id: RoomId,
1126 user_id: UserId,
1127 role: ChannelRole,
1128 ) -> Result<TransactionGuard<proto::Room>> {
1129 self.room_transaction(room_id, |tx| async move {
1130 room_participant::Entity::find()
1131 .filter(
1132 Condition::all()
1133 .add(room_participant::Column::RoomId.eq(room_id))
1134 .add(room_participant::Column::UserId.eq(admin_id))
1135 .add(room_participant::Column::Role.eq(ChannelRole::Admin)),
1136 )
1137 .one(&*tx)
1138 .await?
1139 .context("only admins can set participant role")?;
1140
1141 if role.requires_cla() {
1142 self.check_user_has_signed_cla(user_id, room_id, &tx)
1143 .await?;
1144 }
1145
1146 let result = room_participant::Entity::update_many()
1147 .filter(
1148 Condition::all()
1149 .add(room_participant::Column::RoomId.eq(room_id))
1150 .add(room_participant::Column::UserId.eq(user_id)),
1151 )
1152 .set(room_participant::ActiveModel {
1153 role: ActiveValue::set(Some(role)),
1154 ..Default::default()
1155 })
1156 .exec(&*tx)
1157 .await?;
1158
1159 if result.rows_affected != 1 {
1160 Err(anyhow!("could not update room participant role"))?;
1161 }
1162 self.get_room(room_id, &tx).await
1163 })
1164 .await
1165 }
1166
1167 async fn check_user_has_signed_cla(
1168 &self,
1169 user_id: UserId,
1170 room_id: RoomId,
1171 tx: &DatabaseTransaction,
1172 ) -> Result<()> {
1173 let channel = room::Entity::find_by_id(room_id)
1174 .one(tx)
1175 .await?
1176 .context("could not find room")?
1177 .find_related(channel::Entity)
1178 .one(tx)
1179 .await?;
1180
1181 if let Some(channel) = channel {
1182 let requires_zed_cla = channel.requires_zed_cla
1183 || channel::Entity::find()
1184 .filter(
1185 channel::Column::Id
1186 .is_in(channel.ancestors())
1187 .and(channel::Column::RequiresZedCla.eq(true)),
1188 )
1189 .count(tx)
1190 .await?
1191 > 0;
1192 if requires_zed_cla
1193 && contributor::Entity::find()
1194 .filter(contributor::Column::UserId.eq(user_id))
1195 .one(tx)
1196 .await?
1197 .is_none()
1198 {
1199 Err(anyhow!("user has not signed the Zed CLA"))?;
1200 }
1201 }
1202 Ok(())
1203 }
1204
1205 pub async fn connection_lost(&self, connection: ConnectionId) -> Result<()> {
1206 self.transaction(|tx| async move {
1207 self.room_connection_lost(connection, &tx).await?;
1208 self.channel_buffer_connection_lost(connection, &tx).await?;
1209 Ok(())
1210 })
1211 .await
1212 }
1213
1214 pub async fn room_connection_lost(
1215 &self,
1216 connection: ConnectionId,
1217 tx: &DatabaseTransaction,
1218 ) -> Result<()> {
1219 let participant = room_participant::Entity::find()
1220 .filter(
1221 Condition::all()
1222 .add(room_participant::Column::AnsweringConnectionId.eq(connection.id as i32))
1223 .add(
1224 room_participant::Column::AnsweringConnectionServerId
1225 .eq(connection.owner_id as i32),
1226 ),
1227 )
1228 .one(tx)
1229 .await?;
1230
1231 if let Some(participant) = participant {
1232 room_participant::Entity::update(room_participant::ActiveModel {
1233 answering_connection_lost: ActiveValue::set(true),
1234 ..participant.into_active_model()
1235 })
1236 .exec(tx)
1237 .await?;
1238 }
1239 Ok(())
1240 }
1241
1242 fn build_incoming_call(
1243 room: &proto::Room,
1244 called_user_id: UserId,
1245 ) -> Option<proto::IncomingCall> {
1246 let pending_participant = room
1247 .pending_participants
1248 .iter()
1249 .find(|participant| participant.user_id == called_user_id.to_proto())?;
1250
1251 Some(proto::IncomingCall {
1252 room_id: room.id,
1253 calling_user_id: pending_participant.calling_user_id,
1254 participant_user_ids: room
1255 .participants
1256 .iter()
1257 .map(|participant| participant.user_id)
1258 .collect(),
1259 initial_project: room.participants.iter().find_map(|participant| {
1260 let initial_project_id = pending_participant.initial_project_id?;
1261 participant
1262 .projects
1263 .iter()
1264 .find(|project| project.id == initial_project_id)
1265 .cloned()
1266 }),
1267 })
1268 }
1269
1270 pub async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
1271 let (_, room) = self.get_channel_room(room_id, tx).await?;
1272 Ok(room)
1273 }
1274
1275 pub async fn room_connection_ids(
1276 &self,
1277 room_id: RoomId,
1278 connection_id: ConnectionId,
1279 ) -> Result<TransactionGuard<HashSet<ConnectionId>>> {
1280 self.room_transaction(room_id, |tx| async move {
1281 let mut participants = room_participant::Entity::find()
1282 .filter(room_participant::Column::RoomId.eq(room_id))
1283 .stream(&*tx)
1284 .await?;
1285
1286 let mut is_participant = false;
1287 let mut connection_ids = HashSet::default();
1288 while let Some(participant) = participants.next().await {
1289 let participant = participant?;
1290 if let Some(answering_connection) = participant.answering_connection() {
1291 if answering_connection == connection_id {
1292 is_participant = true;
1293 } else {
1294 connection_ids.insert(answering_connection);
1295 }
1296 }
1297 }
1298
1299 if !is_participant {
1300 Err(anyhow!("not a room participant"))?;
1301 }
1302
1303 Ok(connection_ids)
1304 })
1305 .await
1306 }
1307
1308 async fn get_channel_room(
1309 &self,
1310 room_id: RoomId,
1311 tx: &DatabaseTransaction,
1312 ) -> Result<(Option<channel::Model>, proto::Room)> {
1313 let db_room = room::Entity::find_by_id(room_id)
1314 .one(tx)
1315 .await?
1316 .context("could not find room")?;
1317
1318 let mut db_participants = db_room
1319 .find_related(room_participant::Entity)
1320 .stream(tx)
1321 .await?;
1322 let mut participants = HashMap::default();
1323 let mut pending_participants = Vec::new();
1324 while let Some(db_participant) = db_participants.next().await {
1325 let db_participant = db_participant?;
1326 if let (
1327 Some(answering_connection_id),
1328 Some(answering_connection_server_id),
1329 Some(participant_index),
1330 ) = (
1331 db_participant.answering_connection_id,
1332 db_participant.answering_connection_server_id,
1333 db_participant.participant_index,
1334 ) {
1335 let location = match (
1336 db_participant.location_kind,
1337 db_participant.location_project_id,
1338 ) {
1339 (Some(0), Some(project_id)) => {
1340 Some(proto::participant_location::Variant::SharedProject(
1341 proto::participant_location::SharedProject {
1342 id: project_id.to_proto(),
1343 },
1344 ))
1345 }
1346 (Some(1), _) => Some(proto::participant_location::Variant::UnsharedProject(
1347 Default::default(),
1348 )),
1349 _ => Some(proto::participant_location::Variant::External(
1350 Default::default(),
1351 )),
1352 };
1353
1354 let answering_connection = ConnectionId {
1355 owner_id: answering_connection_server_id.0 as u32,
1356 id: answering_connection_id as u32,
1357 };
1358 participants.insert(
1359 answering_connection,
1360 proto::Participant {
1361 user_id: db_participant.user_id.to_proto(),
1362 peer_id: Some(answering_connection.into()),
1363 projects: Default::default(),
1364 location: Some(proto::ParticipantLocation { variant: location }),
1365 participant_index: participant_index as u32,
1366 role: db_participant.role.unwrap_or(ChannelRole::Member).into(),
1367 },
1368 );
1369 } else {
1370 pending_participants.push(proto::PendingParticipant {
1371 user_id: db_participant.user_id.to_proto(),
1372 calling_user_id: db_participant.calling_user_id.to_proto(),
1373 initial_project_id: db_participant.initial_project_id.map(|id| id.to_proto()),
1374 });
1375 }
1376 }
1377 drop(db_participants);
1378
1379 let db_projects = db_room
1380 .find_related(project::Entity)
1381 .find_with_related(worktree::Entity)
1382 .all(tx)
1383 .await?;
1384
1385 for (db_project, db_worktrees) in db_projects {
1386 let host_connection = db_project.host_connection()?;
1387 if let Some(participant) = participants.get_mut(&host_connection) {
1388 participant.projects.push(proto::ParticipantProject {
1389 id: db_project.id.to_proto(),
1390 worktree_root_names: Default::default(),
1391 });
1392 let project = participant.projects.last_mut().unwrap();
1393
1394 for db_worktree in db_worktrees {
1395 if db_worktree.visible {
1396 project.worktree_root_names.push(db_worktree.root_name);
1397 }
1398 }
1399 }
1400 }
1401
1402 let mut db_followers = db_room.find_related(follower::Entity).stream(tx).await?;
1403 let mut followers = Vec::new();
1404 while let Some(db_follower) = db_followers.next().await {
1405 let db_follower = db_follower?;
1406 followers.push(proto::Follower {
1407 leader_id: Some(db_follower.leader_connection().into()),
1408 follower_id: Some(db_follower.follower_connection().into()),
1409 project_id: db_follower.project_id.to_proto(),
1410 });
1411 }
1412 drop(db_followers);
1413
1414 let channel = if let Some(channel_id) = db_room.channel_id {
1415 Some(self.get_channel_internal(channel_id, tx).await?)
1416 } else {
1417 None
1418 };
1419
1420 Ok((
1421 channel,
1422 proto::Room {
1423 id: db_room.id.to_proto(),
1424 livekit_room: db_room.live_kit_room,
1425 participants: participants.into_values().collect(),
1426 pending_participants,
1427 followers,
1428 },
1429 ))
1430 }
1431}