1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetSupermavenApiKey, Background),
205 (GetSupermavenApiKeyResponse, Background),
206 (GetTypeDefinition, Background),
207 (GetTypeDefinitionResponse, Background),
208 (GetImplementation, Background),
209 (GetImplementationResponse, Background),
210 (GetUsers, Foreground),
211 (Hello, Foreground),
212 (IncomingCall, Foreground),
213 (InlayHints, Background),
214 (InlayHintsResponse, Background),
215 (InviteChannelMember, Foreground),
216 (JoinChannel, Foreground),
217 (JoinChannelBuffer, Foreground),
218 (JoinChannelBufferResponse, Foreground),
219 (JoinChannelChat, Foreground),
220 (JoinChannelChatResponse, Foreground),
221 (JoinProject, Foreground),
222 (JoinHostedProject, Foreground),
223 (JoinProjectResponse, Foreground),
224 (JoinRoom, Foreground),
225 (JoinRoomResponse, Foreground),
226 (LanguageModelResponse, Background),
227 (LeaveChannelBuffer, Background),
228 (LeaveChannelChat, Foreground),
229 (LeaveProject, Foreground),
230 (LeaveRoom, Foreground),
231 (MarkNotificationRead, Foreground),
232 (MoveChannel, Foreground),
233 (OnTypeFormatting, Background),
234 (OnTypeFormattingResponse, Background),
235 (OpenBufferById, Background),
236 (OpenBufferByPath, Background),
237 (OpenBufferForSymbol, Background),
238 (OpenBufferForSymbolResponse, Background),
239 (OpenBufferResponse, Background),
240 (PerformRename, Background),
241 (PerformRenameResponse, Background),
242 (Ping, Foreground),
243 (PrepareRename, Background),
244 (PrepareRenameResponse, Background),
245 (ProjectEntryResponse, Foreground),
246 (RefreshInlayHints, Foreground),
247 (RejoinChannelBuffers, Foreground),
248 (RejoinChannelBuffersResponse, Foreground),
249 (RejoinRoom, Foreground),
250 (RejoinRoomResponse, Foreground),
251 (ReloadBuffers, Foreground),
252 (ReloadBuffersResponse, Foreground),
253 (RemoveChannelMember, Foreground),
254 (RemoveChannelMessage, Foreground),
255 (UpdateChannelMessage, Foreground),
256 (RemoveContact, Foreground),
257 (RemoveProjectCollaborator, Foreground),
258 (RenameChannel, Foreground),
259 (RenameChannelResponse, Foreground),
260 (RenameProjectEntry, Foreground),
261 (RequestContact, Foreground),
262 (ResolveCompletionDocumentation, Background),
263 (ResolveCompletionDocumentationResponse, Background),
264 (ResolveInlayHint, Background),
265 (ResolveInlayHintResponse, Background),
266 (RespondToChannelInvite, Foreground),
267 (RespondToContactRequest, Foreground),
268 (RoomUpdated, Foreground),
269 (SaveBuffer, Foreground),
270 (SetChannelMemberRole, Foreground),
271 (SetChannelVisibility, Foreground),
272 (SearchProject, Background),
273 (SearchProjectResponse, Background),
274 (SendChannelMessage, Background),
275 (SendChannelMessageResponse, Background),
276 (ShareProject, Foreground),
277 (ShareProjectResponse, Foreground),
278 (ShowContacts, Foreground),
279 (StartLanguageServer, Foreground),
280 (SynchronizeBuffers, Foreground),
281 (SynchronizeBuffersResponse, Foreground),
282 (TaskContextForLocation, Background),
283 (TaskContext, Background),
284 (TaskTemplates, Background),
285 (TaskTemplatesResponse, Background),
286 (Test, Foreground),
287 (Unfollow, Foreground),
288 (UnshareProject, Foreground),
289 (UpdateBuffer, Foreground),
290 (UpdateBufferFile, Foreground),
291 (UpdateChannelBuffer, Foreground),
292 (UpdateChannelBufferCollaborators, Foreground),
293 (UpdateChannels, Foreground),
294 (UpdateUserChannels, Foreground),
295 (UpdateContacts, Foreground),
296 (UpdateDiagnosticSummary, Foreground),
297 (UpdateDiffBase, Foreground),
298 (UpdateFollowers, Foreground),
299 (UpdateInviteInfo, Foreground),
300 (UpdateLanguageServer, Foreground),
301 (UpdateParticipantLocation, Foreground),
302 (UpdateProject, Foreground),
303 (UpdateProjectCollaborator, Foreground),
304 (UpdateWorktree, Foreground),
305 (UpdateWorktreeSettings, Foreground),
306 (UsersResponse, Foreground),
307 (LspExtExpandMacro, Background),
308 (LspExtExpandMacroResponse, Background),
309 (SetRoomParticipantRole, Foreground),
310 (BlameBuffer, Foreground),
311 (BlameBufferResponse, Foreground),
312 (CreateDevServerProject, Background),
313 (CreateDevServerProjectResponse, Foreground),
314 (CreateDevServer, Foreground),
315 (CreateDevServerResponse, Foreground),
316 (DevServerInstructions, Foreground),
317 (ShutdownDevServer, Foreground),
318 (ReconnectDevServer, Foreground),
319 (ReconnectDevServerResponse, Foreground),
320 (ShareDevServerProject, Foreground),
321 (JoinDevServerProject, Foreground),
322 (RejoinRemoteProjects, Foreground),
323 (RejoinRemoteProjectsResponse, Foreground),
324 (MultiLspQuery, Background),
325 (MultiLspQueryResponse, Background),
326 (DevServerProjectsUpdate, Foreground),
327 (ValidateDevServerProjectRequest, Background),
328 (DeleteDevServer, Foreground),
329 (DeleteDevServerProject, Foreground),
330 (RegenerateDevServerToken, Foreground),
331 (RegenerateDevServerTokenResponse, Foreground),
332 (RenameDevServer, Foreground),
333 (OpenNewBuffer, Foreground),
334);
335
336request_messages!(
337 (ApplyCodeAction, ApplyCodeActionResponse),
338 (
339 ApplyCompletionAdditionalEdits,
340 ApplyCompletionAdditionalEditsResponse
341 ),
342 (Call, Ack),
343 (CancelCall, Ack),
344 (CopyProjectEntry, ProjectEntryResponse),
345 (CompleteWithLanguageModel, LanguageModelResponse),
346 (ComputeEmbeddings, ComputeEmbeddingsResponse),
347 (CountTokensWithLanguageModel, CountTokensResponse),
348 (CreateChannel, CreateChannelResponse),
349 (CreateProjectEntry, ProjectEntryResponse),
350 (CreateRoom, CreateRoomResponse),
351 (DeclineCall, Ack),
352 (DeleteChannel, Ack),
353 (DeleteProjectEntry, ProjectEntryResponse),
354 (ExpandProjectEntry, ExpandProjectEntryResponse),
355 (Follow, FollowResponse),
356 (FormatBuffers, FormatBuffersResponse),
357 (FuzzySearchUsers, UsersResponse),
358 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
359 (GetChannelMembers, GetChannelMembersResponse),
360 (GetChannelMessages, GetChannelMessagesResponse),
361 (GetChannelMessagesById, GetChannelMessagesResponse),
362 (GetCodeActions, GetCodeActionsResponse),
363 (GetCompletions, GetCompletionsResponse),
364 (GetDefinition, GetDefinitionResponse),
365 (GetImplementation, GetImplementationResponse),
366 (GetDocumentHighlights, GetDocumentHighlightsResponse),
367 (GetHover, GetHoverResponse),
368 (GetNotifications, GetNotificationsResponse),
369 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
370 (GetProjectSymbols, GetProjectSymbolsResponse),
371 (GetReferences, GetReferencesResponse),
372 (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
373 (GetTypeDefinition, GetTypeDefinitionResponse),
374 (GetUsers, UsersResponse),
375 (IncomingCall, Ack),
376 (InlayHints, InlayHintsResponse),
377 (InviteChannelMember, Ack),
378 (JoinChannel, JoinRoomResponse),
379 (JoinChannelBuffer, JoinChannelBufferResponse),
380 (JoinChannelChat, JoinChannelChatResponse),
381 (JoinHostedProject, JoinProjectResponse),
382 (JoinProject, JoinProjectResponse),
383 (JoinRoom, JoinRoomResponse),
384 (LeaveChannelBuffer, Ack),
385 (LeaveRoom, Ack),
386 (MarkNotificationRead, Ack),
387 (MoveChannel, Ack),
388 (OnTypeFormatting, OnTypeFormattingResponse),
389 (OpenBufferById, OpenBufferResponse),
390 (OpenBufferByPath, OpenBufferResponse),
391 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
392 (OpenNewBuffer, OpenBufferResponse),
393 (PerformRename, PerformRenameResponse),
394 (Ping, Ack),
395 (PrepareRename, PrepareRenameResponse),
396 (RefreshInlayHints, Ack),
397 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
398 (RejoinRoom, RejoinRoomResponse),
399 (ReloadBuffers, ReloadBuffersResponse),
400 (RemoveChannelMember, Ack),
401 (RemoveChannelMessage, Ack),
402 (UpdateChannelMessage, Ack),
403 (RemoveContact, Ack),
404 (RenameChannel, RenameChannelResponse),
405 (RenameProjectEntry, ProjectEntryResponse),
406 (RequestContact, Ack),
407 (
408 ResolveCompletionDocumentation,
409 ResolveCompletionDocumentationResponse
410 ),
411 (ResolveInlayHint, ResolveInlayHintResponse),
412 (RespondToChannelInvite, Ack),
413 (RespondToContactRequest, Ack),
414 (SaveBuffer, BufferSaved),
415 (SearchProject, SearchProjectResponse),
416 (SendChannelMessage, SendChannelMessageResponse),
417 (SetChannelMemberRole, Ack),
418 (SetChannelVisibility, Ack),
419 (ShareProject, ShareProjectResponse),
420 (SynchronizeBuffers, SynchronizeBuffersResponse),
421 (TaskContextForLocation, TaskContext),
422 (TaskTemplates, TaskTemplatesResponse),
423 (Test, Test),
424 (UpdateBuffer, Ack),
425 (UpdateParticipantLocation, Ack),
426 (UpdateProject, Ack),
427 (UpdateWorktree, Ack),
428 (LspExtExpandMacro, LspExtExpandMacroResponse),
429 (SetRoomParticipantRole, Ack),
430 (BlameBuffer, BlameBufferResponse),
431 (CreateDevServerProject, CreateDevServerProjectResponse),
432 (CreateDevServer, CreateDevServerResponse),
433 (ShutdownDevServer, Ack),
434 (ShareDevServerProject, ShareProjectResponse),
435 (JoinDevServerProject, JoinProjectResponse),
436 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
437 (ReconnectDevServer, ReconnectDevServerResponse),
438 (ValidateDevServerProjectRequest, Ack),
439 (MultiLspQuery, MultiLspQueryResponse),
440 (DeleteDevServer, Ack),
441 (DeleteDevServerProject, Ack),
442 (RegenerateDevServerToken, RegenerateDevServerTokenResponse),
443 (RenameDevServer, Ack)
444);
445
446entity_messages!(
447 {project_id, ShareProject},
448 AddProjectCollaborator,
449 ApplyCodeAction,
450 ApplyCompletionAdditionalEdits,
451 BlameBuffer,
452 BufferReloaded,
453 BufferSaved,
454 CopyProjectEntry,
455 CreateBufferForPeer,
456 CreateProjectEntry,
457 DeleteProjectEntry,
458 ExpandProjectEntry,
459 FormatBuffers,
460 GetCodeActions,
461 GetCompletions,
462 GetDefinition,
463 GetImplementation,
464 GetDocumentHighlights,
465 GetHover,
466 GetProjectSymbols,
467 GetReferences,
468 GetTypeDefinition,
469 InlayHints,
470 JoinProject,
471 LeaveProject,
472 MultiLspQuery,
473 OnTypeFormatting,
474 OpenNewBuffer,
475 OpenBufferById,
476 OpenBufferByPath,
477 OpenBufferForSymbol,
478 PerformRename,
479 PrepareRename,
480 RefreshInlayHints,
481 ReloadBuffers,
482 RemoveProjectCollaborator,
483 RenameProjectEntry,
484 ResolveCompletionDocumentation,
485 ResolveInlayHint,
486 SaveBuffer,
487 SearchProject,
488 StartLanguageServer,
489 SynchronizeBuffers,
490 TaskContextForLocation,
491 TaskTemplates,
492 UnshareProject,
493 UpdateBuffer,
494 UpdateBufferFile,
495 UpdateDiagnosticSummary,
496 UpdateDiffBase,
497 UpdateLanguageServer,
498 UpdateProject,
499 UpdateProjectCollaborator,
500 UpdateWorktree,
501 UpdateWorktreeSettings,
502 LspExtExpandMacro,
503);
504
505entity_messages!(
506 {channel_id, Channel},
507 ChannelMessageSent,
508 ChannelMessageUpdate,
509 RemoveChannelMessage,
510 UpdateChannelMessage,
511 UpdateChannelBuffer,
512 UpdateChannelBufferCollaborators,
513);
514
515const KIB: usize = 1024;
516const MIB: usize = KIB * 1024;
517const MAX_BUFFER_LEN: usize = MIB;
518
519/// A stream of protobuf messages.
520pub struct MessageStream<S> {
521 stream: S,
522 encoding_buffer: Vec<u8>,
523}
524
525#[allow(clippy::large_enum_variant)]
526#[derive(Debug)]
527pub enum Message {
528 Envelope(Envelope),
529 Ping,
530 Pong,
531}
532
533impl<S> MessageStream<S> {
534 pub fn new(stream: S) -> Self {
535 Self {
536 stream,
537 encoding_buffer: Vec::new(),
538 }
539 }
540
541 pub fn inner_mut(&mut self) -> &mut S {
542 &mut self.stream
543 }
544}
545
546impl<S> MessageStream<S>
547where
548 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
549{
550 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
551 #[cfg(any(test, feature = "test-support"))]
552 const COMPRESSION_LEVEL: i32 = -7;
553
554 #[cfg(not(any(test, feature = "test-support")))]
555 const COMPRESSION_LEVEL: i32 = 4;
556
557 match message {
558 Message::Envelope(message) => {
559 self.encoding_buffer.reserve(message.encoded_len());
560 message
561 .encode(&mut self.encoding_buffer)
562 .map_err(io::Error::from)?;
563 let buffer =
564 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
565 .unwrap();
566
567 self.encoding_buffer.clear();
568 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
569 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
570 }
571 Message::Ping => {
572 self.stream
573 .send(WebSocketMessage::Ping(Default::default()))
574 .await?;
575 }
576 Message::Pong => {
577 self.stream
578 .send(WebSocketMessage::Pong(Default::default()))
579 .await?;
580 }
581 }
582
583 Ok(())
584 }
585}
586
587impl<S> MessageStream<S>
588where
589 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
590{
591 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
592 while let Some(bytes) = self.stream.next().await {
593 let received_at = Instant::now();
594 match bytes? {
595 WebSocketMessage::Binary(bytes) => {
596 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
597 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
598 .map_err(io::Error::from)?;
599
600 self.encoding_buffer.clear();
601 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
602 return Ok((Message::Envelope(envelope), received_at));
603 }
604 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
605 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
606 WebSocketMessage::Close(_) => break,
607 _ => {}
608 }
609 }
610 Err(anyhow!("connection closed"))
611 }
612}
613
614impl From<Timestamp> for SystemTime {
615 fn from(val: Timestamp) -> Self {
616 UNIX_EPOCH
617 .checked_add(Duration::new(val.seconds, val.nanos))
618 .unwrap()
619 }
620}
621
622impl From<SystemTime> for Timestamp {
623 fn from(time: SystemTime) -> Self {
624 let duration = time.duration_since(UNIX_EPOCH).unwrap();
625 Self {
626 seconds: duration.as_secs(),
627 nanos: duration.subsec_nanos(),
628 }
629 }
630}
631
632impl From<u128> for Nonce {
633 fn from(nonce: u128) -> Self {
634 let upper_half = (nonce >> 64) as u64;
635 let lower_half = nonce as u64;
636 Self {
637 upper_half,
638 lower_half,
639 }
640 }
641}
642
643impl From<Nonce> for u128 {
644 fn from(nonce: Nonce) -> Self {
645 let upper_half = (nonce.upper_half as u128) << 64;
646 let lower_half = nonce.lower_half as u128;
647 upper_half | lower_half
648 }
649}
650
651pub fn split_worktree_update(
652 mut message: UpdateWorktree,
653 max_chunk_size: usize,
654) -> impl Iterator<Item = UpdateWorktree> {
655 let mut done_files = false;
656
657 let mut repository_map = message
658 .updated_repositories
659 .into_iter()
660 .map(|repo| (repo.work_directory_id, repo))
661 .collect::<HashMap<_, _>>();
662
663 iter::from_fn(move || {
664 if done_files {
665 return None;
666 }
667
668 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
669 let updated_entries: Vec<_> = message
670 .updated_entries
671 .drain(..updated_entries_chunk_size)
672 .collect();
673
674 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
675 let removed_entries = message
676 .removed_entries
677 .drain(..removed_entries_chunk_size)
678 .collect();
679
680 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
681
682 let mut updated_repositories = Vec::new();
683
684 if !repository_map.is_empty() {
685 for entry in &updated_entries {
686 if let Some(repo) = repository_map.remove(&entry.id) {
687 updated_repositories.push(repo)
688 }
689 }
690 }
691
692 let removed_repositories = if done_files {
693 mem::take(&mut message.removed_repositories)
694 } else {
695 Default::default()
696 };
697
698 if done_files {
699 updated_repositories.extend(mem::take(&mut repository_map).into_values());
700 }
701
702 Some(UpdateWorktree {
703 project_id: message.project_id,
704 worktree_id: message.worktree_id,
705 root_name: message.root_name.clone(),
706 abs_path: message.abs_path.clone(),
707 updated_entries,
708 removed_entries,
709 scan_id: message.scan_id,
710 is_last_update: done_files && message.is_last_update,
711 updated_repositories,
712 removed_repositories,
713 })
714 })
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720
721 #[gpui::test]
722 async fn test_buffer_size() {
723 let (tx, rx) = futures::channel::mpsc::unbounded();
724 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
725 sink.write(Message::Envelope(Envelope {
726 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
727 root_name: "abcdefg".repeat(10),
728 ..Default::default()
729 })),
730 ..Default::default()
731 }))
732 .await
733 .unwrap();
734 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
735 sink.write(Message::Envelope(Envelope {
736 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
737 root_name: "abcdefg".repeat(1000000),
738 ..Default::default()
739 })),
740 ..Default::default()
741 }))
742 .await
743 .unwrap();
744 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
745
746 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
747 stream.read().await.unwrap();
748 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
749 stream.read().await.unwrap();
750 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
751 }
752
753 #[gpui::test]
754 fn test_converting_peer_id_from_and_to_u64() {
755 let peer_id = PeerId {
756 owner_id: 10,
757 id: 3,
758 };
759 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
760 let peer_id = PeerId {
761 owner_id: u32::MAX,
762 id: 3,
763 };
764 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
765 let peer_id = PeerId {
766 owner_id: 10,
767 id: u32::MAX,
768 };
769 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
770 let peer_id = PeerId {
771 owner_id: u32::MAX,
772 id: u32::MAX,
773 };
774 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
775 }
776}