1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetTypeDefinition, Background),
205 (GetTypeDefinitionResponse, Background),
206 (GetImplementation, Background),
207 (GetImplementationResponse, Background),
208 (GetUsers, Foreground),
209 (Hello, Foreground),
210 (IncomingCall, Foreground),
211 (InlayHints, Background),
212 (InlayHintsResponse, Background),
213 (InviteChannelMember, Foreground),
214 (JoinChannel, Foreground),
215 (JoinChannelBuffer, Foreground),
216 (JoinChannelBufferResponse, Foreground),
217 (JoinChannelChat, Foreground),
218 (JoinChannelChatResponse, Foreground),
219 (JoinProject, Foreground),
220 (JoinHostedProject, Foreground),
221 (JoinProjectResponse, Foreground),
222 (JoinRoom, Foreground),
223 (JoinRoomResponse, Foreground),
224 (LanguageModelResponse, Background),
225 (LeaveChannelBuffer, Background),
226 (LeaveChannelChat, Foreground),
227 (LeaveProject, Foreground),
228 (LeaveRoom, Foreground),
229 (MarkNotificationRead, Foreground),
230 (MoveChannel, Foreground),
231 (OnTypeFormatting, Background),
232 (OnTypeFormattingResponse, Background),
233 (OpenBufferById, Background),
234 (OpenBufferByPath, Background),
235 (OpenBufferForSymbol, Background),
236 (OpenBufferForSymbolResponse, Background),
237 (OpenBufferResponse, Background),
238 (PerformRename, Background),
239 (PerformRenameResponse, Background),
240 (Ping, Foreground),
241 (PrepareRename, Background),
242 (PrepareRenameResponse, Background),
243 (ProjectEntryResponse, Foreground),
244 (RefreshInlayHints, Foreground),
245 (RejoinChannelBuffers, Foreground),
246 (RejoinChannelBuffersResponse, Foreground),
247 (RejoinRoom, Foreground),
248 (RejoinRoomResponse, Foreground),
249 (ReloadBuffers, Foreground),
250 (ReloadBuffersResponse, Foreground),
251 (RemoveChannelMember, Foreground),
252 (RemoveChannelMessage, Foreground),
253 (UpdateChannelMessage, Foreground),
254 (RemoveContact, Foreground),
255 (RemoveProjectCollaborator, Foreground),
256 (RenameChannel, Foreground),
257 (RenameChannelResponse, Foreground),
258 (RenameProjectEntry, Foreground),
259 (RequestContact, Foreground),
260 (ResolveCompletionDocumentation, Background),
261 (ResolveCompletionDocumentationResponse, Background),
262 (ResolveInlayHint, Background),
263 (ResolveInlayHintResponse, Background),
264 (RespondToChannelInvite, Foreground),
265 (RespondToContactRequest, Foreground),
266 (RoomUpdated, Foreground),
267 (SaveBuffer, Foreground),
268 (SetChannelMemberRole, Foreground),
269 (SetChannelVisibility, Foreground),
270 (SearchProject, Background),
271 (SearchProjectResponse, Background),
272 (SendChannelMessage, Background),
273 (SendChannelMessageResponse, Background),
274 (ShareProject, Foreground),
275 (ShareProjectResponse, Foreground),
276 (ShowContacts, Foreground),
277 (StartLanguageServer, Foreground),
278 (SynchronizeBuffers, Foreground),
279 (SynchronizeBuffersResponse, Foreground),
280 (Test, Foreground),
281 (Unfollow, Foreground),
282 (UnshareProject, Foreground),
283 (UpdateBuffer, Foreground),
284 (UpdateBufferFile, Foreground),
285 (UpdateChannelBuffer, Foreground),
286 (UpdateChannelBufferCollaborators, Foreground),
287 (UpdateChannels, Foreground),
288 (UpdateUserChannels, Foreground),
289 (UpdateContacts, Foreground),
290 (UpdateDiagnosticSummary, Foreground),
291 (UpdateDiffBase, Foreground),
292 (UpdateFollowers, Foreground),
293 (UpdateInviteInfo, Foreground),
294 (UpdateLanguageServer, Foreground),
295 (UpdateParticipantLocation, Foreground),
296 (UpdateProject, Foreground),
297 (UpdateProjectCollaborator, Foreground),
298 (UpdateWorktree, Foreground),
299 (UpdateWorktreeSettings, Foreground),
300 (UsersResponse, Foreground),
301 (LspExtExpandMacro, Background),
302 (LspExtExpandMacroResponse, Background),
303 (SetRoomParticipantRole, Foreground),
304 (BlameBuffer, Foreground),
305 (BlameBufferResponse, Foreground),
306 (CreateDevServerProject, Background),
307 (CreateDevServerProjectResponse, Foreground),
308 (CreateDevServer, Foreground),
309 (CreateDevServerResponse, Foreground),
310 (DevServerInstructions, Foreground),
311 (ShutdownDevServer, Foreground),
312 (ReconnectDevServer, Foreground),
313 (ReconnectDevServerResponse, Foreground),
314 (ShareDevServerProject, Foreground),
315 (JoinDevServerProject, Foreground),
316 (RejoinRemoteProjects, Foreground),
317 (RejoinRemoteProjectsResponse, Foreground),
318 (MultiLspQuery, Background),
319 (MultiLspQueryResponse, Background),
320 (DevServerProjectsUpdate, Foreground),
321 (ValidateDevServerProjectRequest, Background),
322 (DeleteDevServer, Foreground),
323 (DeleteDevServerProject, Foreground),
324 (OpenNewBuffer, Foreground)
325);
326
327request_messages!(
328 (ApplyCodeAction, ApplyCodeActionResponse),
329 (
330 ApplyCompletionAdditionalEdits,
331 ApplyCompletionAdditionalEditsResponse
332 ),
333 (Call, Ack),
334 (CancelCall, Ack),
335 (CopyProjectEntry, ProjectEntryResponse),
336 (CompleteWithLanguageModel, LanguageModelResponse),
337 (ComputeEmbeddings, ComputeEmbeddingsResponse),
338 (CountTokensWithLanguageModel, CountTokensResponse),
339 (CreateChannel, CreateChannelResponse),
340 (CreateProjectEntry, ProjectEntryResponse),
341 (CreateRoom, CreateRoomResponse),
342 (DeclineCall, Ack),
343 (DeleteChannel, Ack),
344 (DeleteProjectEntry, ProjectEntryResponse),
345 (ExpandProjectEntry, ExpandProjectEntryResponse),
346 (Follow, FollowResponse),
347 (FormatBuffers, FormatBuffersResponse),
348 (FuzzySearchUsers, UsersResponse),
349 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
350 (GetChannelMembers, GetChannelMembersResponse),
351 (GetChannelMessages, GetChannelMessagesResponse),
352 (GetChannelMessagesById, GetChannelMessagesResponse),
353 (GetCodeActions, GetCodeActionsResponse),
354 (GetCompletions, GetCompletionsResponse),
355 (GetDefinition, GetDefinitionResponse),
356 (GetImplementation, GetImplementationResponse),
357 (GetDocumentHighlights, GetDocumentHighlightsResponse),
358 (GetHover, GetHoverResponse),
359 (GetNotifications, GetNotificationsResponse),
360 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
361 (GetProjectSymbols, GetProjectSymbolsResponse),
362 (GetReferences, GetReferencesResponse),
363 (GetTypeDefinition, GetTypeDefinitionResponse),
364 (GetUsers, UsersResponse),
365 (IncomingCall, Ack),
366 (InlayHints, InlayHintsResponse),
367 (InviteChannelMember, Ack),
368 (JoinChannel, JoinRoomResponse),
369 (JoinChannelBuffer, JoinChannelBufferResponse),
370 (JoinChannelChat, JoinChannelChatResponse),
371 (JoinHostedProject, JoinProjectResponse),
372 (JoinProject, JoinProjectResponse),
373 (JoinRoom, JoinRoomResponse),
374 (LeaveChannelBuffer, Ack),
375 (LeaveRoom, Ack),
376 (MarkNotificationRead, Ack),
377 (MoveChannel, Ack),
378 (OnTypeFormatting, OnTypeFormattingResponse),
379 (OpenBufferById, OpenBufferResponse),
380 (OpenBufferByPath, OpenBufferResponse),
381 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
382 (OpenNewBuffer, OpenBufferResponse),
383 (PerformRename, PerformRenameResponse),
384 (Ping, Ack),
385 (PrepareRename, PrepareRenameResponse),
386 (RefreshInlayHints, Ack),
387 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
388 (RejoinRoom, RejoinRoomResponse),
389 (ReloadBuffers, ReloadBuffersResponse),
390 (RemoveChannelMember, Ack),
391 (RemoveChannelMessage, Ack),
392 (UpdateChannelMessage, Ack),
393 (RemoveContact, Ack),
394 (RenameChannel, RenameChannelResponse),
395 (RenameProjectEntry, ProjectEntryResponse),
396 (RequestContact, Ack),
397 (
398 ResolveCompletionDocumentation,
399 ResolveCompletionDocumentationResponse
400 ),
401 (ResolveInlayHint, ResolveInlayHintResponse),
402 (RespondToChannelInvite, Ack),
403 (RespondToContactRequest, Ack),
404 (SaveBuffer, BufferSaved),
405 (SearchProject, SearchProjectResponse),
406 (SendChannelMessage, SendChannelMessageResponse),
407 (SetChannelMemberRole, Ack),
408 (SetChannelVisibility, Ack),
409 (ShareProject, ShareProjectResponse),
410 (SynchronizeBuffers, SynchronizeBuffersResponse),
411 (Test, Test),
412 (UpdateBuffer, Ack),
413 (UpdateParticipantLocation, Ack),
414 (UpdateProject, Ack),
415 (UpdateWorktree, Ack),
416 (LspExtExpandMacro, LspExtExpandMacroResponse),
417 (SetRoomParticipantRole, Ack),
418 (BlameBuffer, BlameBufferResponse),
419 (CreateDevServerProject, CreateDevServerProjectResponse),
420 (CreateDevServer, CreateDevServerResponse),
421 (ShutdownDevServer, Ack),
422 (ShareDevServerProject, ShareProjectResponse),
423 (JoinDevServerProject, JoinProjectResponse),
424 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
425 (ReconnectDevServer, ReconnectDevServerResponse),
426 (ValidateDevServerProjectRequest, Ack),
427 (MultiLspQuery, MultiLspQueryResponse),
428 (DeleteDevServer, Ack),
429 (DeleteDevServerProject, Ack),
430);
431
432entity_messages!(
433 {project_id, ShareProject},
434 AddProjectCollaborator,
435 ApplyCodeAction,
436 ApplyCompletionAdditionalEdits,
437 BlameBuffer,
438 BufferReloaded,
439 BufferSaved,
440 CopyProjectEntry,
441 CreateBufferForPeer,
442 CreateProjectEntry,
443 DeleteProjectEntry,
444 ExpandProjectEntry,
445 FormatBuffers,
446 GetCodeActions,
447 GetCompletions,
448 GetDefinition,
449 GetImplementation,
450 GetDocumentHighlights,
451 GetHover,
452 GetProjectSymbols,
453 GetReferences,
454 GetTypeDefinition,
455 InlayHints,
456 JoinProject,
457 LeaveProject,
458 MultiLspQuery,
459 OnTypeFormatting,
460 OpenNewBuffer,
461 OpenBufferById,
462 OpenBufferByPath,
463 OpenBufferForSymbol,
464 PerformRename,
465 PrepareRename,
466 RefreshInlayHints,
467 ReloadBuffers,
468 RemoveProjectCollaborator,
469 RenameProjectEntry,
470 ResolveCompletionDocumentation,
471 ResolveInlayHint,
472 SaveBuffer,
473 SearchProject,
474 StartLanguageServer,
475 SynchronizeBuffers,
476 UnshareProject,
477 UpdateBuffer,
478 UpdateBufferFile,
479 UpdateDiagnosticSummary,
480 UpdateDiffBase,
481 UpdateLanguageServer,
482 UpdateProject,
483 UpdateProjectCollaborator,
484 UpdateWorktree,
485 UpdateWorktreeSettings,
486 LspExtExpandMacro,
487);
488
489entity_messages!(
490 {channel_id, Channel},
491 ChannelMessageSent,
492 ChannelMessageUpdate,
493 RemoveChannelMessage,
494 UpdateChannelMessage,
495 UpdateChannelBuffer,
496 UpdateChannelBufferCollaborators,
497);
498
499const KIB: usize = 1024;
500const MIB: usize = KIB * 1024;
501const MAX_BUFFER_LEN: usize = MIB;
502
503/// A stream of protobuf messages.
504pub struct MessageStream<S> {
505 stream: S,
506 encoding_buffer: Vec<u8>,
507}
508
509#[allow(clippy::large_enum_variant)]
510#[derive(Debug)]
511pub enum Message {
512 Envelope(Envelope),
513 Ping,
514 Pong,
515}
516
517impl<S> MessageStream<S> {
518 pub fn new(stream: S) -> Self {
519 Self {
520 stream,
521 encoding_buffer: Vec::new(),
522 }
523 }
524
525 pub fn inner_mut(&mut self) -> &mut S {
526 &mut self.stream
527 }
528}
529
530impl<S> MessageStream<S>
531where
532 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
533{
534 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
535 #[cfg(any(test, feature = "test-support"))]
536 const COMPRESSION_LEVEL: i32 = -7;
537
538 #[cfg(not(any(test, feature = "test-support")))]
539 const COMPRESSION_LEVEL: i32 = 4;
540
541 match message {
542 Message::Envelope(message) => {
543 self.encoding_buffer.reserve(message.encoded_len());
544 message
545 .encode(&mut self.encoding_buffer)
546 .map_err(io::Error::from)?;
547 let buffer =
548 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
549 .unwrap();
550
551 self.encoding_buffer.clear();
552 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
553 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
554 }
555 Message::Ping => {
556 self.stream
557 .send(WebSocketMessage::Ping(Default::default()))
558 .await?;
559 }
560 Message::Pong => {
561 self.stream
562 .send(WebSocketMessage::Pong(Default::default()))
563 .await?;
564 }
565 }
566
567 Ok(())
568 }
569}
570
571impl<S> MessageStream<S>
572where
573 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
574{
575 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
576 while let Some(bytes) = self.stream.next().await {
577 let received_at = Instant::now();
578 match bytes? {
579 WebSocketMessage::Binary(bytes) => {
580 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
581 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
582 .map_err(io::Error::from)?;
583
584 self.encoding_buffer.clear();
585 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
586 return Ok((Message::Envelope(envelope), received_at));
587 }
588 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
589 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
590 WebSocketMessage::Close(_) => break,
591 _ => {}
592 }
593 }
594 Err(anyhow!("connection closed"))
595 }
596}
597
598impl From<Timestamp> for SystemTime {
599 fn from(val: Timestamp) -> Self {
600 UNIX_EPOCH
601 .checked_add(Duration::new(val.seconds, val.nanos))
602 .unwrap()
603 }
604}
605
606impl From<SystemTime> for Timestamp {
607 fn from(time: SystemTime) -> Self {
608 let duration = time.duration_since(UNIX_EPOCH).unwrap();
609 Self {
610 seconds: duration.as_secs(),
611 nanos: duration.subsec_nanos(),
612 }
613 }
614}
615
616impl From<u128> for Nonce {
617 fn from(nonce: u128) -> Self {
618 let upper_half = (nonce >> 64) as u64;
619 let lower_half = nonce as u64;
620 Self {
621 upper_half,
622 lower_half,
623 }
624 }
625}
626
627impl From<Nonce> for u128 {
628 fn from(nonce: Nonce) -> Self {
629 let upper_half = (nonce.upper_half as u128) << 64;
630 let lower_half = nonce.lower_half as u128;
631 upper_half | lower_half
632 }
633}
634
635pub fn split_worktree_update(
636 mut message: UpdateWorktree,
637 max_chunk_size: usize,
638) -> impl Iterator<Item = UpdateWorktree> {
639 let mut done_files = false;
640
641 let mut repository_map = message
642 .updated_repositories
643 .into_iter()
644 .map(|repo| (repo.work_directory_id, repo))
645 .collect::<HashMap<_, _>>();
646
647 iter::from_fn(move || {
648 if done_files {
649 return None;
650 }
651
652 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
653 let updated_entries: Vec<_> = message
654 .updated_entries
655 .drain(..updated_entries_chunk_size)
656 .collect();
657
658 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
659 let removed_entries = message
660 .removed_entries
661 .drain(..removed_entries_chunk_size)
662 .collect();
663
664 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
665
666 let mut updated_repositories = Vec::new();
667
668 if !repository_map.is_empty() {
669 for entry in &updated_entries {
670 if let Some(repo) = repository_map.remove(&entry.id) {
671 updated_repositories.push(repo)
672 }
673 }
674 }
675
676 let removed_repositories = if done_files {
677 mem::take(&mut message.removed_repositories)
678 } else {
679 Default::default()
680 };
681
682 if done_files {
683 updated_repositories.extend(mem::take(&mut repository_map).into_values());
684 }
685
686 Some(UpdateWorktree {
687 project_id: message.project_id,
688 worktree_id: message.worktree_id,
689 root_name: message.root_name.clone(),
690 abs_path: message.abs_path.clone(),
691 updated_entries,
692 removed_entries,
693 scan_id: message.scan_id,
694 is_last_update: done_files && message.is_last_update,
695 updated_repositories,
696 removed_repositories,
697 })
698 })
699}
700
701#[cfg(test)]
702mod tests {
703 use super::*;
704
705 #[gpui::test]
706 async fn test_buffer_size() {
707 let (tx, rx) = futures::channel::mpsc::unbounded();
708 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
709 sink.write(Message::Envelope(Envelope {
710 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
711 root_name: "abcdefg".repeat(10),
712 ..Default::default()
713 })),
714 ..Default::default()
715 }))
716 .await
717 .unwrap();
718 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
719 sink.write(Message::Envelope(Envelope {
720 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
721 root_name: "abcdefg".repeat(1000000),
722 ..Default::default()
723 })),
724 ..Default::default()
725 }))
726 .await
727 .unwrap();
728 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
729
730 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
731 stream.read().await.unwrap();
732 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
733 stream.read().await.unwrap();
734 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
735 }
736
737 #[gpui::test]
738 fn test_converting_peer_id_from_and_to_u64() {
739 let peer_id = PeerId {
740 owner_id: 10,
741 id: 3,
742 };
743 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
744 let peer_id = PeerId {
745 owner_id: u32::MAX,
746 id: 3,
747 };
748 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
749 let peer_id = PeerId {
750 owner_id: 10,
751 id: u32::MAX,
752 };
753 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
754 let peer_id = PeerId {
755 owner_id: u32::MAX,
756 id: u32::MAX,
757 };
758 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
759 }
760}