1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetTypeDefinition, Background),
205 (GetTypeDefinitionResponse, Background),
206 (GetImplementation, Background),
207 (GetImplementationResponse, Background),
208 (GetUsers, Foreground),
209 (Hello, Foreground),
210 (IncomingCall, Foreground),
211 (InlayHints, Background),
212 (InlayHintsResponse, Background),
213 (InviteChannelMember, Foreground),
214 (JoinChannel, Foreground),
215 (JoinChannelBuffer, Foreground),
216 (JoinChannelBufferResponse, Foreground),
217 (JoinChannelChat, Foreground),
218 (JoinChannelChatResponse, Foreground),
219 (JoinProject, Foreground),
220 (JoinHostedProject, Foreground),
221 (JoinProjectResponse, Foreground),
222 (JoinRoom, Foreground),
223 (JoinRoomResponse, Foreground),
224 (LanguageModelResponse, Background),
225 (LeaveChannelBuffer, Background),
226 (LeaveChannelChat, Foreground),
227 (LeaveProject, Foreground),
228 (LeaveRoom, Foreground),
229 (MarkNotificationRead, Foreground),
230 (MoveChannel, Foreground),
231 (OnTypeFormatting, Background),
232 (OnTypeFormattingResponse, Background),
233 (OpenBufferById, Background),
234 (OpenBufferByPath, Background),
235 (OpenBufferForSymbol, Background),
236 (OpenBufferForSymbolResponse, Background),
237 (OpenBufferResponse, Background),
238 (PerformRename, Background),
239 (PerformRenameResponse, Background),
240 (Ping, Foreground),
241 (PrepareRename, Background),
242 (PrepareRenameResponse, Background),
243 (ProjectEntryResponse, Foreground),
244 (RefreshInlayHints, Foreground),
245 (RejoinChannelBuffers, Foreground),
246 (RejoinChannelBuffersResponse, Foreground),
247 (RejoinRoom, Foreground),
248 (RejoinRoomResponse, Foreground),
249 (ReloadBuffers, Foreground),
250 (ReloadBuffersResponse, Foreground),
251 (RemoveChannelMember, Foreground),
252 (RemoveChannelMessage, Foreground),
253 (UpdateChannelMessage, Foreground),
254 (RemoveContact, Foreground),
255 (RemoveProjectCollaborator, Foreground),
256 (RenameChannel, Foreground),
257 (RenameChannelResponse, Foreground),
258 (RenameProjectEntry, Foreground),
259 (RequestContact, Foreground),
260 (ResolveCompletionDocumentation, Background),
261 (ResolveCompletionDocumentationResponse, Background),
262 (ResolveInlayHint, Background),
263 (ResolveInlayHintResponse, Background),
264 (RespondToChannelInvite, Foreground),
265 (RespondToContactRequest, Foreground),
266 (RoomUpdated, Foreground),
267 (SaveBuffer, Foreground),
268 (SetChannelMemberRole, Foreground),
269 (SetChannelVisibility, Foreground),
270 (SearchProject, Background),
271 (SearchProjectResponse, Background),
272 (SendChannelMessage, Background),
273 (SendChannelMessageResponse, Background),
274 (ShareProject, Foreground),
275 (ShareProjectResponse, Foreground),
276 (ShowContacts, Foreground),
277 (StartLanguageServer, Foreground),
278 (SynchronizeBuffers, Foreground),
279 (SynchronizeBuffersResponse, Foreground),
280 (Test, Foreground),
281 (Unfollow, Foreground),
282 (UnshareProject, Foreground),
283 (UpdateBuffer, Foreground),
284 (UpdateBufferFile, Foreground),
285 (UpdateChannelBuffer, Foreground),
286 (UpdateChannelBufferCollaborators, Foreground),
287 (UpdateChannels, Foreground),
288 (UpdateUserChannels, Foreground),
289 (UpdateContacts, Foreground),
290 (UpdateDiagnosticSummary, Foreground),
291 (UpdateDiffBase, Foreground),
292 (UpdateFollowers, Foreground),
293 (UpdateInviteInfo, Foreground),
294 (UpdateLanguageServer, Foreground),
295 (UpdateParticipantLocation, Foreground),
296 (UpdateProject, Foreground),
297 (UpdateProjectCollaborator, Foreground),
298 (UpdateWorktree, Foreground),
299 (UpdateWorktreeSettings, Foreground),
300 (UsersResponse, Foreground),
301 (LspExtExpandMacro, Background),
302 (LspExtExpandMacroResponse, Background),
303 (SetRoomParticipantRole, Foreground),
304 (BlameBuffer, Foreground),
305 (BlameBufferResponse, Foreground),
306 (CreateRemoteProject, Foreground),
307 (CreateRemoteProjectResponse, Foreground),
308 (CreateDevServer, Foreground),
309 (CreateDevServerResponse, Foreground),
310 (DevServerInstructions, Foreground),
311 (ShutdownDevServer, Foreground),
312 (ReconnectDevServer, Foreground),
313 (ReconnectDevServerResponse, Foreground),
314 (ShareRemoteProject, Foreground),
315 (JoinRemoteProject, Foreground),
316 (RejoinRemoteProjects, Foreground),
317 (RejoinRemoteProjectsResponse, Foreground),
318 (MultiLspQuery, Background),
319 (MultiLspQueryResponse, Background),
320);
321
322request_messages!(
323 (ApplyCodeAction, ApplyCodeActionResponse),
324 (
325 ApplyCompletionAdditionalEdits,
326 ApplyCompletionAdditionalEditsResponse
327 ),
328 (Call, Ack),
329 (CancelCall, Ack),
330 (CopyProjectEntry, ProjectEntryResponse),
331 (CompleteWithLanguageModel, LanguageModelResponse),
332 (ComputeEmbeddings, ComputeEmbeddingsResponse),
333 (CountTokensWithLanguageModel, CountTokensResponse),
334 (CreateChannel, CreateChannelResponse),
335 (CreateProjectEntry, ProjectEntryResponse),
336 (CreateRoom, CreateRoomResponse),
337 (DeclineCall, Ack),
338 (DeleteChannel, Ack),
339 (DeleteProjectEntry, ProjectEntryResponse),
340 (ExpandProjectEntry, ExpandProjectEntryResponse),
341 (Follow, FollowResponse),
342 (FormatBuffers, FormatBuffersResponse),
343 (FuzzySearchUsers, UsersResponse),
344 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
345 (GetChannelMembers, GetChannelMembersResponse),
346 (GetChannelMessages, GetChannelMessagesResponse),
347 (GetChannelMessagesById, GetChannelMessagesResponse),
348 (GetCodeActions, GetCodeActionsResponse),
349 (GetCompletions, GetCompletionsResponse),
350 (GetDefinition, GetDefinitionResponse),
351 (GetImplementation, GetImplementationResponse),
352 (GetDocumentHighlights, GetDocumentHighlightsResponse),
353 (GetHover, GetHoverResponse),
354 (GetNotifications, GetNotificationsResponse),
355 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
356 (GetProjectSymbols, GetProjectSymbolsResponse),
357 (GetReferences, GetReferencesResponse),
358 (GetTypeDefinition, GetTypeDefinitionResponse),
359 (GetUsers, UsersResponse),
360 (IncomingCall, Ack),
361 (InlayHints, InlayHintsResponse),
362 (InviteChannelMember, Ack),
363 (JoinChannel, JoinRoomResponse),
364 (JoinChannelBuffer, JoinChannelBufferResponse),
365 (JoinChannelChat, JoinChannelChatResponse),
366 (JoinHostedProject, JoinProjectResponse),
367 (JoinProject, JoinProjectResponse),
368 (JoinRoom, JoinRoomResponse),
369 (LeaveChannelBuffer, Ack),
370 (LeaveRoom, Ack),
371 (MarkNotificationRead, Ack),
372 (MoveChannel, Ack),
373 (OnTypeFormatting, OnTypeFormattingResponse),
374 (OpenBufferById, OpenBufferResponse),
375 (OpenBufferByPath, OpenBufferResponse),
376 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
377 (PerformRename, PerformRenameResponse),
378 (Ping, Ack),
379 (PrepareRename, PrepareRenameResponse),
380 (RefreshInlayHints, Ack),
381 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
382 (RejoinRoom, RejoinRoomResponse),
383 (ReloadBuffers, ReloadBuffersResponse),
384 (RemoveChannelMember, Ack),
385 (RemoveChannelMessage, Ack),
386 (UpdateChannelMessage, Ack),
387 (RemoveContact, Ack),
388 (RenameChannel, RenameChannelResponse),
389 (RenameProjectEntry, ProjectEntryResponse),
390 (RequestContact, Ack),
391 (
392 ResolveCompletionDocumentation,
393 ResolveCompletionDocumentationResponse
394 ),
395 (ResolveInlayHint, ResolveInlayHintResponse),
396 (RespondToChannelInvite, Ack),
397 (RespondToContactRequest, Ack),
398 (SaveBuffer, BufferSaved),
399 (SearchProject, SearchProjectResponse),
400 (SendChannelMessage, SendChannelMessageResponse),
401 (SetChannelMemberRole, Ack),
402 (SetChannelVisibility, Ack),
403 (ShareProject, ShareProjectResponse),
404 (SynchronizeBuffers, SynchronizeBuffersResponse),
405 (Test, Test),
406 (UpdateBuffer, Ack),
407 (UpdateParticipantLocation, Ack),
408 (UpdateProject, Ack),
409 (UpdateWorktree, Ack),
410 (LspExtExpandMacro, LspExtExpandMacroResponse),
411 (SetRoomParticipantRole, Ack),
412 (BlameBuffer, BlameBufferResponse),
413 (CreateRemoteProject, CreateRemoteProjectResponse),
414 (CreateDevServer, CreateDevServerResponse),
415 (ShutdownDevServer, Ack),
416 (ShareRemoteProject, ShareProjectResponse),
417 (JoinRemoteProject, JoinProjectResponse),
418 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
419 (ReconnectDevServer, ReconnectDevServerResponse),
420 (MultiLspQuery, MultiLspQueryResponse),
421);
422
423entity_messages!(
424 {project_id, ShareProject},
425 AddProjectCollaborator,
426 ApplyCodeAction,
427 ApplyCompletionAdditionalEdits,
428 BlameBuffer,
429 BufferReloaded,
430 BufferSaved,
431 CopyProjectEntry,
432 CreateBufferForPeer,
433 CreateProjectEntry,
434 DeleteProjectEntry,
435 ExpandProjectEntry,
436 FormatBuffers,
437 GetCodeActions,
438 GetCompletions,
439 GetDefinition,
440 GetImplementation,
441 GetDocumentHighlights,
442 GetHover,
443 GetProjectSymbols,
444 GetReferences,
445 GetTypeDefinition,
446 InlayHints,
447 JoinProject,
448 LeaveProject,
449 MultiLspQuery,
450 OnTypeFormatting,
451 OpenBufferById,
452 OpenBufferByPath,
453 OpenBufferForSymbol,
454 PerformRename,
455 PrepareRename,
456 RefreshInlayHints,
457 ReloadBuffers,
458 RemoveProjectCollaborator,
459 RenameProjectEntry,
460 ResolveCompletionDocumentation,
461 ResolveInlayHint,
462 SaveBuffer,
463 SearchProject,
464 StartLanguageServer,
465 SynchronizeBuffers,
466 UnshareProject,
467 UpdateBuffer,
468 UpdateBufferFile,
469 UpdateDiagnosticSummary,
470 UpdateDiffBase,
471 UpdateLanguageServer,
472 UpdateProject,
473 UpdateProjectCollaborator,
474 UpdateWorktree,
475 UpdateWorktreeSettings,
476 LspExtExpandMacro,
477);
478
479entity_messages!(
480 {channel_id, Channel},
481 ChannelMessageSent,
482 ChannelMessageUpdate,
483 RemoveChannelMessage,
484 UpdateChannelMessage,
485 UpdateChannelBuffer,
486 UpdateChannelBufferCollaborators,
487);
488
489const KIB: usize = 1024;
490const MIB: usize = KIB * 1024;
491const MAX_BUFFER_LEN: usize = MIB;
492
493/// A stream of protobuf messages.
494pub struct MessageStream<S> {
495 stream: S,
496 encoding_buffer: Vec<u8>,
497}
498
499#[allow(clippy::large_enum_variant)]
500#[derive(Debug)]
501pub enum Message {
502 Envelope(Envelope),
503 Ping,
504 Pong,
505}
506
507impl<S> MessageStream<S> {
508 pub fn new(stream: S) -> Self {
509 Self {
510 stream,
511 encoding_buffer: Vec::new(),
512 }
513 }
514
515 pub fn inner_mut(&mut self) -> &mut S {
516 &mut self.stream
517 }
518}
519
520impl<S> MessageStream<S>
521where
522 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
523{
524 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
525 #[cfg(any(test, feature = "test-support"))]
526 const COMPRESSION_LEVEL: i32 = -7;
527
528 #[cfg(not(any(test, feature = "test-support")))]
529 const COMPRESSION_LEVEL: i32 = 4;
530
531 match message {
532 Message::Envelope(message) => {
533 self.encoding_buffer.reserve(message.encoded_len());
534 message
535 .encode(&mut self.encoding_buffer)
536 .map_err(io::Error::from)?;
537 let buffer =
538 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
539 .unwrap();
540
541 self.encoding_buffer.clear();
542 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
543 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
544 }
545 Message::Ping => {
546 self.stream
547 .send(WebSocketMessage::Ping(Default::default()))
548 .await?;
549 }
550 Message::Pong => {
551 self.stream
552 .send(WebSocketMessage::Pong(Default::default()))
553 .await?;
554 }
555 }
556
557 Ok(())
558 }
559}
560
561impl<S> MessageStream<S>
562where
563 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
564{
565 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
566 while let Some(bytes) = self.stream.next().await {
567 let received_at = Instant::now();
568 match bytes? {
569 WebSocketMessage::Binary(bytes) => {
570 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
571 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
572 .map_err(io::Error::from)?;
573
574 self.encoding_buffer.clear();
575 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
576 return Ok((Message::Envelope(envelope), received_at));
577 }
578 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
579 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
580 WebSocketMessage::Close(_) => break,
581 _ => {}
582 }
583 }
584 Err(anyhow!("connection closed"))
585 }
586}
587
588impl From<Timestamp> for SystemTime {
589 fn from(val: Timestamp) -> Self {
590 UNIX_EPOCH
591 .checked_add(Duration::new(val.seconds, val.nanos))
592 .unwrap()
593 }
594}
595
596impl From<SystemTime> for Timestamp {
597 fn from(time: SystemTime) -> Self {
598 let duration = time.duration_since(UNIX_EPOCH).unwrap();
599 Self {
600 seconds: duration.as_secs(),
601 nanos: duration.subsec_nanos(),
602 }
603 }
604}
605
606impl From<u128> for Nonce {
607 fn from(nonce: u128) -> Self {
608 let upper_half = (nonce >> 64) as u64;
609 let lower_half = nonce as u64;
610 Self {
611 upper_half,
612 lower_half,
613 }
614 }
615}
616
617impl From<Nonce> for u128 {
618 fn from(nonce: Nonce) -> Self {
619 let upper_half = (nonce.upper_half as u128) << 64;
620 let lower_half = nonce.lower_half as u128;
621 upper_half | lower_half
622 }
623}
624
625pub fn split_worktree_update(
626 mut message: UpdateWorktree,
627 max_chunk_size: usize,
628) -> impl Iterator<Item = UpdateWorktree> {
629 let mut done_files = false;
630
631 let mut repository_map = message
632 .updated_repositories
633 .into_iter()
634 .map(|repo| (repo.work_directory_id, repo))
635 .collect::<HashMap<_, _>>();
636
637 iter::from_fn(move || {
638 if done_files {
639 return None;
640 }
641
642 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
643 let updated_entries: Vec<_> = message
644 .updated_entries
645 .drain(..updated_entries_chunk_size)
646 .collect();
647
648 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
649 let removed_entries = message
650 .removed_entries
651 .drain(..removed_entries_chunk_size)
652 .collect();
653
654 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
655
656 let mut updated_repositories = Vec::new();
657
658 if !repository_map.is_empty() {
659 for entry in &updated_entries {
660 if let Some(repo) = repository_map.remove(&entry.id) {
661 updated_repositories.push(repo)
662 }
663 }
664 }
665
666 let removed_repositories = if done_files {
667 mem::take(&mut message.removed_repositories)
668 } else {
669 Default::default()
670 };
671
672 if done_files {
673 updated_repositories.extend(mem::take(&mut repository_map).into_values());
674 }
675
676 Some(UpdateWorktree {
677 project_id: message.project_id,
678 worktree_id: message.worktree_id,
679 root_name: message.root_name.clone(),
680 abs_path: message.abs_path.clone(),
681 updated_entries,
682 removed_entries,
683 scan_id: message.scan_id,
684 is_last_update: done_files && message.is_last_update,
685 updated_repositories,
686 removed_repositories,
687 })
688 })
689}
690
691#[cfg(test)]
692mod tests {
693 use super::*;
694
695 #[gpui::test]
696 async fn test_buffer_size() {
697 let (tx, rx) = futures::channel::mpsc::unbounded();
698 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
699 sink.write(Message::Envelope(Envelope {
700 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
701 root_name: "abcdefg".repeat(10),
702 ..Default::default()
703 })),
704 ..Default::default()
705 }))
706 .await
707 .unwrap();
708 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
709 sink.write(Message::Envelope(Envelope {
710 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
711 root_name: "abcdefg".repeat(1000000),
712 ..Default::default()
713 })),
714 ..Default::default()
715 }))
716 .await
717 .unwrap();
718 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
719
720 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
721 stream.read().await.unwrap();
722 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
723 stream.read().await.unwrap();
724 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
725 }
726
727 #[gpui::test]
728 fn test_converting_peer_id_from_and_to_u64() {
729 let peer_id = PeerId {
730 owner_id: 10,
731 id: 3,
732 };
733 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
734 let peer_id = PeerId {
735 owner_id: u32::MAX,
736 id: 3,
737 };
738 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
739 let peer_id = PeerId {
740 owner_id: 10,
741 id: u32::MAX,
742 };
743 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
744 let peer_id = PeerId {
745 owner_id: u32::MAX,
746 id: u32::MAX,
747 };
748 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
749 }
750}