1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetSupermavenApiKey, Background),
205 (GetSupermavenApiKeyResponse, Background),
206 (GetTypeDefinition, Background),
207 (GetTypeDefinitionResponse, Background),
208 (GetImplementation, Background),
209 (GetImplementationResponse, Background),
210 (GetUsers, Foreground),
211 (Hello, Foreground),
212 (IncomingCall, Foreground),
213 (InlayHints, Background),
214 (InlayHintsResponse, Background),
215 (InviteChannelMember, Foreground),
216 (JoinChannel, Foreground),
217 (JoinChannelBuffer, Foreground),
218 (JoinChannelBufferResponse, Foreground),
219 (JoinChannelChat, Foreground),
220 (JoinChannelChatResponse, Foreground),
221 (JoinProject, Foreground),
222 (JoinHostedProject, Foreground),
223 (JoinProjectResponse, Foreground),
224 (JoinRoom, Foreground),
225 (JoinRoomResponse, Foreground),
226 (LanguageModelResponse, Background),
227 (LeaveChannelBuffer, Background),
228 (LeaveChannelChat, Foreground),
229 (LeaveProject, Foreground),
230 (LeaveRoom, Foreground),
231 (MarkNotificationRead, Foreground),
232 (MoveChannel, Foreground),
233 (OnTypeFormatting, Background),
234 (OnTypeFormattingResponse, Background),
235 (OpenBufferById, Background),
236 (OpenBufferByPath, Background),
237 (OpenBufferForSymbol, Background),
238 (OpenBufferForSymbolResponse, Background),
239 (OpenBufferResponse, Background),
240 (PerformRename, Background),
241 (PerformRenameResponse, Background),
242 (Ping, Foreground),
243 (PrepareRename, Background),
244 (PrepareRenameResponse, Background),
245 (ProjectEntryResponse, Foreground),
246 (RefreshInlayHints, Foreground),
247 (RejoinChannelBuffers, Foreground),
248 (RejoinChannelBuffersResponse, Foreground),
249 (RejoinRoom, Foreground),
250 (RejoinRoomResponse, Foreground),
251 (ReloadBuffers, Foreground),
252 (ReloadBuffersResponse, Foreground),
253 (RemoveChannelMember, Foreground),
254 (RemoveChannelMessage, Foreground),
255 (UpdateChannelMessage, Foreground),
256 (RemoveContact, Foreground),
257 (RemoveProjectCollaborator, Foreground),
258 (RenameChannel, Foreground),
259 (RenameChannelResponse, Foreground),
260 (RenameProjectEntry, Foreground),
261 (RequestContact, Foreground),
262 (ResolveCompletionDocumentation, Background),
263 (ResolveCompletionDocumentationResponse, Background),
264 (ResolveInlayHint, Background),
265 (ResolveInlayHintResponse, Background),
266 (RespondToChannelInvite, Foreground),
267 (RespondToContactRequest, Foreground),
268 (RoomUpdated, Foreground),
269 (SaveBuffer, Foreground),
270 (SetChannelMemberRole, Foreground),
271 (SetChannelVisibility, Foreground),
272 (SearchProject, Background),
273 (SearchProjectResponse, Background),
274 (SendChannelMessage, Background),
275 (SendChannelMessageResponse, Background),
276 (ShareProject, Foreground),
277 (ShareProjectResponse, Foreground),
278 (ShowContacts, Foreground),
279 (StartLanguageServer, Foreground),
280 (SynchronizeBuffers, Foreground),
281 (SynchronizeBuffersResponse, Foreground),
282 (Test, Foreground),
283 (Unfollow, Foreground),
284 (UnshareProject, Foreground),
285 (UpdateBuffer, Foreground),
286 (UpdateBufferFile, Foreground),
287 (UpdateChannelBuffer, Foreground),
288 (UpdateChannelBufferCollaborators, Foreground),
289 (UpdateChannels, Foreground),
290 (UpdateUserChannels, Foreground),
291 (UpdateContacts, Foreground),
292 (UpdateDiagnosticSummary, Foreground),
293 (UpdateDiffBase, Foreground),
294 (UpdateFollowers, Foreground),
295 (UpdateInviteInfo, Foreground),
296 (UpdateLanguageServer, Foreground),
297 (UpdateParticipantLocation, Foreground),
298 (UpdateProject, Foreground),
299 (UpdateProjectCollaborator, Foreground),
300 (UpdateWorktree, Foreground),
301 (UpdateWorktreeSettings, Foreground),
302 (UsersResponse, Foreground),
303 (LspExtExpandMacro, Background),
304 (LspExtExpandMacroResponse, Background),
305 (SetRoomParticipantRole, Foreground),
306 (BlameBuffer, Foreground),
307 (BlameBufferResponse, Foreground),
308 (CreateDevServerProject, Background),
309 (CreateDevServerProjectResponse, Foreground),
310 (CreateDevServer, Foreground),
311 (CreateDevServerResponse, Foreground),
312 (DevServerInstructions, Foreground),
313 (ShutdownDevServer, Foreground),
314 (ReconnectDevServer, Foreground),
315 (ReconnectDevServerResponse, Foreground),
316 (ShareDevServerProject, Foreground),
317 (JoinDevServerProject, Foreground),
318 (RejoinRemoteProjects, Foreground),
319 (RejoinRemoteProjectsResponse, Foreground),
320 (MultiLspQuery, Background),
321 (MultiLspQueryResponse, Background),
322 (DevServerProjectsUpdate, Foreground),
323 (ValidateDevServerProjectRequest, Background),
324 (DeleteDevServer, Foreground),
325 (DeleteDevServerProject, Foreground),
326 (OpenNewBuffer, Foreground)
327);
328
329request_messages!(
330 (ApplyCodeAction, ApplyCodeActionResponse),
331 (
332 ApplyCompletionAdditionalEdits,
333 ApplyCompletionAdditionalEditsResponse
334 ),
335 (Call, Ack),
336 (CancelCall, Ack),
337 (CopyProjectEntry, ProjectEntryResponse),
338 (CompleteWithLanguageModel, LanguageModelResponse),
339 (ComputeEmbeddings, ComputeEmbeddingsResponse),
340 (CountTokensWithLanguageModel, CountTokensResponse),
341 (CreateChannel, CreateChannelResponse),
342 (CreateProjectEntry, ProjectEntryResponse),
343 (CreateRoom, CreateRoomResponse),
344 (DeclineCall, Ack),
345 (DeleteChannel, Ack),
346 (DeleteProjectEntry, ProjectEntryResponse),
347 (ExpandProjectEntry, ExpandProjectEntryResponse),
348 (Follow, FollowResponse),
349 (FormatBuffers, FormatBuffersResponse),
350 (FuzzySearchUsers, UsersResponse),
351 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
352 (GetChannelMembers, GetChannelMembersResponse),
353 (GetChannelMessages, GetChannelMessagesResponse),
354 (GetChannelMessagesById, GetChannelMessagesResponse),
355 (GetCodeActions, GetCodeActionsResponse),
356 (GetCompletions, GetCompletionsResponse),
357 (GetDefinition, GetDefinitionResponse),
358 (GetImplementation, GetImplementationResponse),
359 (GetDocumentHighlights, GetDocumentHighlightsResponse),
360 (GetHover, GetHoverResponse),
361 (GetNotifications, GetNotificationsResponse),
362 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
363 (GetProjectSymbols, GetProjectSymbolsResponse),
364 (GetReferences, GetReferencesResponse),
365 (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
366 (GetTypeDefinition, GetTypeDefinitionResponse),
367 (GetUsers, UsersResponse),
368 (IncomingCall, Ack),
369 (InlayHints, InlayHintsResponse),
370 (InviteChannelMember, Ack),
371 (JoinChannel, JoinRoomResponse),
372 (JoinChannelBuffer, JoinChannelBufferResponse),
373 (JoinChannelChat, JoinChannelChatResponse),
374 (JoinHostedProject, JoinProjectResponse),
375 (JoinProject, JoinProjectResponse),
376 (JoinRoom, JoinRoomResponse),
377 (LeaveChannelBuffer, Ack),
378 (LeaveRoom, Ack),
379 (MarkNotificationRead, Ack),
380 (MoveChannel, Ack),
381 (OnTypeFormatting, OnTypeFormattingResponse),
382 (OpenBufferById, OpenBufferResponse),
383 (OpenBufferByPath, OpenBufferResponse),
384 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
385 (OpenNewBuffer, OpenBufferResponse),
386 (PerformRename, PerformRenameResponse),
387 (Ping, Ack),
388 (PrepareRename, PrepareRenameResponse),
389 (RefreshInlayHints, Ack),
390 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
391 (RejoinRoom, RejoinRoomResponse),
392 (ReloadBuffers, ReloadBuffersResponse),
393 (RemoveChannelMember, Ack),
394 (RemoveChannelMessage, Ack),
395 (UpdateChannelMessage, Ack),
396 (RemoveContact, Ack),
397 (RenameChannel, RenameChannelResponse),
398 (RenameProjectEntry, ProjectEntryResponse),
399 (RequestContact, Ack),
400 (
401 ResolveCompletionDocumentation,
402 ResolveCompletionDocumentationResponse
403 ),
404 (ResolveInlayHint, ResolveInlayHintResponse),
405 (RespondToChannelInvite, Ack),
406 (RespondToContactRequest, Ack),
407 (SaveBuffer, BufferSaved),
408 (SearchProject, SearchProjectResponse),
409 (SendChannelMessage, SendChannelMessageResponse),
410 (SetChannelMemberRole, Ack),
411 (SetChannelVisibility, Ack),
412 (ShareProject, ShareProjectResponse),
413 (SynchronizeBuffers, SynchronizeBuffersResponse),
414 (Test, Test),
415 (UpdateBuffer, Ack),
416 (UpdateParticipantLocation, Ack),
417 (UpdateProject, Ack),
418 (UpdateWorktree, Ack),
419 (LspExtExpandMacro, LspExtExpandMacroResponse),
420 (SetRoomParticipantRole, Ack),
421 (BlameBuffer, BlameBufferResponse),
422 (CreateDevServerProject, CreateDevServerProjectResponse),
423 (CreateDevServer, CreateDevServerResponse),
424 (ShutdownDevServer, Ack),
425 (ShareDevServerProject, ShareProjectResponse),
426 (JoinDevServerProject, JoinProjectResponse),
427 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
428 (ReconnectDevServer, ReconnectDevServerResponse),
429 (ValidateDevServerProjectRequest, Ack),
430 (MultiLspQuery, MultiLspQueryResponse),
431 (DeleteDevServer, Ack),
432 (DeleteDevServerProject, Ack),
433);
434
435entity_messages!(
436 {project_id, ShareProject},
437 AddProjectCollaborator,
438 ApplyCodeAction,
439 ApplyCompletionAdditionalEdits,
440 BlameBuffer,
441 BufferReloaded,
442 BufferSaved,
443 CopyProjectEntry,
444 CreateBufferForPeer,
445 CreateProjectEntry,
446 DeleteProjectEntry,
447 ExpandProjectEntry,
448 FormatBuffers,
449 GetCodeActions,
450 GetCompletions,
451 GetDefinition,
452 GetImplementation,
453 GetDocumentHighlights,
454 GetHover,
455 GetProjectSymbols,
456 GetReferences,
457 GetTypeDefinition,
458 InlayHints,
459 JoinProject,
460 LeaveProject,
461 MultiLspQuery,
462 OnTypeFormatting,
463 OpenNewBuffer,
464 OpenBufferById,
465 OpenBufferByPath,
466 OpenBufferForSymbol,
467 PerformRename,
468 PrepareRename,
469 RefreshInlayHints,
470 ReloadBuffers,
471 RemoveProjectCollaborator,
472 RenameProjectEntry,
473 ResolveCompletionDocumentation,
474 ResolveInlayHint,
475 SaveBuffer,
476 SearchProject,
477 StartLanguageServer,
478 SynchronizeBuffers,
479 UnshareProject,
480 UpdateBuffer,
481 UpdateBufferFile,
482 UpdateDiagnosticSummary,
483 UpdateDiffBase,
484 UpdateLanguageServer,
485 UpdateProject,
486 UpdateProjectCollaborator,
487 UpdateWorktree,
488 UpdateWorktreeSettings,
489 LspExtExpandMacro,
490);
491
492entity_messages!(
493 {channel_id, Channel},
494 ChannelMessageSent,
495 ChannelMessageUpdate,
496 RemoveChannelMessage,
497 UpdateChannelMessage,
498 UpdateChannelBuffer,
499 UpdateChannelBufferCollaborators,
500);
501
502const KIB: usize = 1024;
503const MIB: usize = KIB * 1024;
504const MAX_BUFFER_LEN: usize = MIB;
505
506/// A stream of protobuf messages.
507pub struct MessageStream<S> {
508 stream: S,
509 encoding_buffer: Vec<u8>,
510}
511
512#[allow(clippy::large_enum_variant)]
513#[derive(Debug)]
514pub enum Message {
515 Envelope(Envelope),
516 Ping,
517 Pong,
518}
519
520impl<S> MessageStream<S> {
521 pub fn new(stream: S) -> Self {
522 Self {
523 stream,
524 encoding_buffer: Vec::new(),
525 }
526 }
527
528 pub fn inner_mut(&mut self) -> &mut S {
529 &mut self.stream
530 }
531}
532
533impl<S> MessageStream<S>
534where
535 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
536{
537 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
538 #[cfg(any(test, feature = "test-support"))]
539 const COMPRESSION_LEVEL: i32 = -7;
540
541 #[cfg(not(any(test, feature = "test-support")))]
542 const COMPRESSION_LEVEL: i32 = 4;
543
544 match message {
545 Message::Envelope(message) => {
546 self.encoding_buffer.reserve(message.encoded_len());
547 message
548 .encode(&mut self.encoding_buffer)
549 .map_err(io::Error::from)?;
550 let buffer =
551 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
552 .unwrap();
553
554 self.encoding_buffer.clear();
555 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
556 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
557 }
558 Message::Ping => {
559 self.stream
560 .send(WebSocketMessage::Ping(Default::default()))
561 .await?;
562 }
563 Message::Pong => {
564 self.stream
565 .send(WebSocketMessage::Pong(Default::default()))
566 .await?;
567 }
568 }
569
570 Ok(())
571 }
572}
573
574impl<S> MessageStream<S>
575where
576 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
577{
578 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
579 while let Some(bytes) = self.stream.next().await {
580 let received_at = Instant::now();
581 match bytes? {
582 WebSocketMessage::Binary(bytes) => {
583 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
584 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
585 .map_err(io::Error::from)?;
586
587 self.encoding_buffer.clear();
588 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
589 return Ok((Message::Envelope(envelope), received_at));
590 }
591 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
592 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
593 WebSocketMessage::Close(_) => break,
594 _ => {}
595 }
596 }
597 Err(anyhow!("connection closed"))
598 }
599}
600
601impl From<Timestamp> for SystemTime {
602 fn from(val: Timestamp) -> Self {
603 UNIX_EPOCH
604 .checked_add(Duration::new(val.seconds, val.nanos))
605 .unwrap()
606 }
607}
608
609impl From<SystemTime> for Timestamp {
610 fn from(time: SystemTime) -> Self {
611 let duration = time.duration_since(UNIX_EPOCH).unwrap();
612 Self {
613 seconds: duration.as_secs(),
614 nanos: duration.subsec_nanos(),
615 }
616 }
617}
618
619impl From<u128> for Nonce {
620 fn from(nonce: u128) -> Self {
621 let upper_half = (nonce >> 64) as u64;
622 let lower_half = nonce as u64;
623 Self {
624 upper_half,
625 lower_half,
626 }
627 }
628}
629
630impl From<Nonce> for u128 {
631 fn from(nonce: Nonce) -> Self {
632 let upper_half = (nonce.upper_half as u128) << 64;
633 let lower_half = nonce.lower_half as u128;
634 upper_half | lower_half
635 }
636}
637
638pub fn split_worktree_update(
639 mut message: UpdateWorktree,
640 max_chunk_size: usize,
641) -> impl Iterator<Item = UpdateWorktree> {
642 let mut done_files = false;
643
644 let mut repository_map = message
645 .updated_repositories
646 .into_iter()
647 .map(|repo| (repo.work_directory_id, repo))
648 .collect::<HashMap<_, _>>();
649
650 iter::from_fn(move || {
651 if done_files {
652 return None;
653 }
654
655 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
656 let updated_entries: Vec<_> = message
657 .updated_entries
658 .drain(..updated_entries_chunk_size)
659 .collect();
660
661 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
662 let removed_entries = message
663 .removed_entries
664 .drain(..removed_entries_chunk_size)
665 .collect();
666
667 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
668
669 let mut updated_repositories = Vec::new();
670
671 if !repository_map.is_empty() {
672 for entry in &updated_entries {
673 if let Some(repo) = repository_map.remove(&entry.id) {
674 updated_repositories.push(repo)
675 }
676 }
677 }
678
679 let removed_repositories = if done_files {
680 mem::take(&mut message.removed_repositories)
681 } else {
682 Default::default()
683 };
684
685 if done_files {
686 updated_repositories.extend(mem::take(&mut repository_map).into_values());
687 }
688
689 Some(UpdateWorktree {
690 project_id: message.project_id,
691 worktree_id: message.worktree_id,
692 root_name: message.root_name.clone(),
693 abs_path: message.abs_path.clone(),
694 updated_entries,
695 removed_entries,
696 scan_id: message.scan_id,
697 is_last_update: done_files && message.is_last_update,
698 updated_repositories,
699 removed_repositories,
700 })
701 })
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 #[gpui::test]
709 async fn test_buffer_size() {
710 let (tx, rx) = futures::channel::mpsc::unbounded();
711 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
712 sink.write(Message::Envelope(Envelope {
713 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
714 root_name: "abcdefg".repeat(10),
715 ..Default::default()
716 })),
717 ..Default::default()
718 }))
719 .await
720 .unwrap();
721 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
722 sink.write(Message::Envelope(Envelope {
723 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
724 root_name: "abcdefg".repeat(1000000),
725 ..Default::default()
726 })),
727 ..Default::default()
728 }))
729 .await
730 .unwrap();
731 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
732
733 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
734 stream.read().await.unwrap();
735 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
736 stream.read().await.unwrap();
737 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
738 }
739
740 #[gpui::test]
741 fn test_converting_peer_id_from_and_to_u64() {
742 let peer_id = PeerId {
743 owner_id: 10,
744 id: 3,
745 };
746 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
747 let peer_id = PeerId {
748 owner_id: u32::MAX,
749 id: 3,
750 };
751 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
752 let peer_id = PeerId {
753 owner_id: 10,
754 id: u32::MAX,
755 };
756 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
757 let peer_id = PeerId {
758 owner_id: u32::MAX,
759 id: u32::MAX,
760 };
761 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
762 }
763}