1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetSupermavenApiKey, Background),
205 (GetSupermavenApiKeyResponse, Background),
206 (GetTypeDefinition, Background),
207 (GetTypeDefinitionResponse, Background),
208 (GetImplementation, Background),
209 (GetImplementationResponse, Background),
210 (GetUsers, Foreground),
211 (Hello, Foreground),
212 (IncomingCall, Foreground),
213 (InlayHints, Background),
214 (InlayHintsResponse, Background),
215 (InviteChannelMember, Foreground),
216 (JoinChannel, Foreground),
217 (JoinChannelBuffer, Foreground),
218 (JoinChannelBufferResponse, Foreground),
219 (JoinChannelChat, Foreground),
220 (JoinChannelChatResponse, Foreground),
221 (JoinProject, Foreground),
222 (JoinHostedProject, Foreground),
223 (JoinProjectResponse, Foreground),
224 (JoinRoom, Foreground),
225 (JoinRoomResponse, Foreground),
226 (LanguageModelResponse, Background),
227 (LeaveChannelBuffer, Background),
228 (LeaveChannelChat, Foreground),
229 (LeaveProject, Foreground),
230 (LeaveRoom, Foreground),
231 (MarkNotificationRead, Foreground),
232 (MoveChannel, Foreground),
233 (OnTypeFormatting, Background),
234 (OnTypeFormattingResponse, Background),
235 (OpenBufferById, Background),
236 (OpenBufferByPath, Background),
237 (OpenBufferForSymbol, Background),
238 (OpenBufferForSymbolResponse, Background),
239 (OpenBufferResponse, Background),
240 (PerformRename, Background),
241 (PerformRenameResponse, Background),
242 (Ping, Foreground),
243 (PrepareRename, Background),
244 (PrepareRenameResponse, Background),
245 (ProjectEntryResponse, Foreground),
246 (RefreshInlayHints, Foreground),
247 (RejoinChannelBuffers, Foreground),
248 (RejoinChannelBuffersResponse, Foreground),
249 (RejoinRoom, Foreground),
250 (RejoinRoomResponse, Foreground),
251 (ReloadBuffers, Foreground),
252 (ReloadBuffersResponse, Foreground),
253 (RemoveChannelMember, Foreground),
254 (RemoveChannelMessage, Foreground),
255 (UpdateChannelMessage, Foreground),
256 (RemoveContact, Foreground),
257 (RemoveProjectCollaborator, Foreground),
258 (RenameChannel, Foreground),
259 (RenameChannelResponse, Foreground),
260 (RenameProjectEntry, Foreground),
261 (RequestContact, Foreground),
262 (ResolveCompletionDocumentation, Background),
263 (ResolveCompletionDocumentationResponse, Background),
264 (ResolveInlayHint, Background),
265 (ResolveInlayHintResponse, Background),
266 (RespondToChannelInvite, Foreground),
267 (RespondToContactRequest, Foreground),
268 (RoomUpdated, Foreground),
269 (SaveBuffer, Foreground),
270 (SetChannelMemberRole, Foreground),
271 (SetChannelVisibility, Foreground),
272 (SearchProject, Background),
273 (SearchProjectResponse, Background),
274 (SendChannelMessage, Background),
275 (SendChannelMessageResponse, Background),
276 (ShareProject, Foreground),
277 (ShareProjectResponse, Foreground),
278 (ShowContacts, Foreground),
279 (StartLanguageServer, Foreground),
280 (SubscribeToChannels, Foreground),
281 (SynchronizeBuffers, Foreground),
282 (SynchronizeBuffersResponse, Foreground),
283 (TaskContextForLocation, Background),
284 (TaskContext, Background),
285 (TaskTemplates, Background),
286 (TaskTemplatesResponse, Background),
287 (Test, Foreground),
288 (Unfollow, Foreground),
289 (UnshareProject, Foreground),
290 (UpdateBuffer, Foreground),
291 (UpdateBufferFile, Foreground),
292 (UpdateChannelBuffer, Foreground),
293 (UpdateChannelBufferCollaborators, Foreground),
294 (UpdateChannels, Foreground),
295 (UpdateUserChannels, Foreground),
296 (UpdateContacts, Foreground),
297 (UpdateDiagnosticSummary, Foreground),
298 (UpdateDiffBase, Foreground),
299 (UpdateFollowers, Foreground),
300 (UpdateInviteInfo, Foreground),
301 (UpdateLanguageServer, Foreground),
302 (UpdateParticipantLocation, Foreground),
303 (UpdateProject, Foreground),
304 (UpdateProjectCollaborator, Foreground),
305 (UpdateWorktree, Foreground),
306 (UpdateWorktreeSettings, Foreground),
307 (UsersResponse, Foreground),
308 (LspExtExpandMacro, Background),
309 (LspExtExpandMacroResponse, Background),
310 (SetRoomParticipantRole, Foreground),
311 (BlameBuffer, Foreground),
312 (BlameBufferResponse, Foreground),
313 (CreateDevServerProject, Background),
314 (CreateDevServerProjectResponse, Foreground),
315 (CreateDevServer, Foreground),
316 (CreateDevServerResponse, Foreground),
317 (DevServerInstructions, Foreground),
318 (ShutdownDevServer, Foreground),
319 (ReconnectDevServer, Foreground),
320 (ReconnectDevServerResponse, Foreground),
321 (ShareDevServerProject, Foreground),
322 (JoinDevServerProject, Foreground),
323 (RejoinRemoteProjects, Foreground),
324 (RejoinRemoteProjectsResponse, Foreground),
325 (MultiLspQuery, Background),
326 (MultiLspQueryResponse, Background),
327 (DevServerProjectsUpdate, Foreground),
328 (ValidateDevServerProjectRequest, Background),
329 (DeleteDevServer, Foreground),
330 (DeleteDevServerProject, Foreground),
331 (RegenerateDevServerToken, Foreground),
332 (RegenerateDevServerTokenResponse, Foreground),
333 (RenameDevServer, Foreground),
334 (OpenNewBuffer, Foreground),
335 (RestartLanguageServers, Foreground),
336);
337
338request_messages!(
339 (ApplyCodeAction, ApplyCodeActionResponse),
340 (
341 ApplyCompletionAdditionalEdits,
342 ApplyCompletionAdditionalEditsResponse
343 ),
344 (Call, Ack),
345 (CancelCall, Ack),
346 (CopyProjectEntry, ProjectEntryResponse),
347 (CompleteWithLanguageModel, LanguageModelResponse),
348 (ComputeEmbeddings, ComputeEmbeddingsResponse),
349 (CountTokensWithLanguageModel, CountTokensResponse),
350 (CreateChannel, CreateChannelResponse),
351 (CreateProjectEntry, ProjectEntryResponse),
352 (CreateRoom, CreateRoomResponse),
353 (DeclineCall, Ack),
354 (DeleteChannel, Ack),
355 (DeleteProjectEntry, ProjectEntryResponse),
356 (ExpandProjectEntry, ExpandProjectEntryResponse),
357 (Follow, FollowResponse),
358 (FormatBuffers, FormatBuffersResponse),
359 (FuzzySearchUsers, UsersResponse),
360 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
361 (GetChannelMembers, GetChannelMembersResponse),
362 (GetChannelMessages, GetChannelMessagesResponse),
363 (GetChannelMessagesById, GetChannelMessagesResponse),
364 (GetCodeActions, GetCodeActionsResponse),
365 (GetCompletions, GetCompletionsResponse),
366 (GetDefinition, GetDefinitionResponse),
367 (GetImplementation, GetImplementationResponse),
368 (GetDocumentHighlights, GetDocumentHighlightsResponse),
369 (GetHover, GetHoverResponse),
370 (GetNotifications, GetNotificationsResponse),
371 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
372 (GetProjectSymbols, GetProjectSymbolsResponse),
373 (GetReferences, GetReferencesResponse),
374 (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
375 (GetTypeDefinition, GetTypeDefinitionResponse),
376 (GetUsers, UsersResponse),
377 (IncomingCall, Ack),
378 (InlayHints, InlayHintsResponse),
379 (InviteChannelMember, Ack),
380 (JoinChannel, JoinRoomResponse),
381 (JoinChannelBuffer, JoinChannelBufferResponse),
382 (JoinChannelChat, JoinChannelChatResponse),
383 (JoinHostedProject, JoinProjectResponse),
384 (JoinProject, JoinProjectResponse),
385 (JoinRoom, JoinRoomResponse),
386 (LeaveChannelBuffer, Ack),
387 (LeaveRoom, Ack),
388 (MarkNotificationRead, Ack),
389 (MoveChannel, Ack),
390 (OnTypeFormatting, OnTypeFormattingResponse),
391 (OpenBufferById, OpenBufferResponse),
392 (OpenBufferByPath, OpenBufferResponse),
393 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
394 (OpenNewBuffer, OpenBufferResponse),
395 (PerformRename, PerformRenameResponse),
396 (Ping, Ack),
397 (PrepareRename, PrepareRenameResponse),
398 (RefreshInlayHints, Ack),
399 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
400 (RejoinRoom, RejoinRoomResponse),
401 (ReloadBuffers, ReloadBuffersResponse),
402 (RemoveChannelMember, Ack),
403 (RemoveChannelMessage, Ack),
404 (UpdateChannelMessage, Ack),
405 (RemoveContact, Ack),
406 (RenameChannel, RenameChannelResponse),
407 (RenameProjectEntry, ProjectEntryResponse),
408 (RequestContact, Ack),
409 (
410 ResolveCompletionDocumentation,
411 ResolveCompletionDocumentationResponse
412 ),
413 (ResolveInlayHint, ResolveInlayHintResponse),
414 (RespondToChannelInvite, Ack),
415 (RespondToContactRequest, Ack),
416 (SaveBuffer, BufferSaved),
417 (SearchProject, SearchProjectResponse),
418 (SendChannelMessage, SendChannelMessageResponse),
419 (SetChannelMemberRole, Ack),
420 (SetChannelVisibility, Ack),
421 (ShareProject, ShareProjectResponse),
422 (SynchronizeBuffers, SynchronizeBuffersResponse),
423 (TaskContextForLocation, TaskContext),
424 (TaskTemplates, TaskTemplatesResponse),
425 (Test, Test),
426 (UpdateBuffer, Ack),
427 (UpdateParticipantLocation, Ack),
428 (UpdateProject, Ack),
429 (UpdateWorktree, Ack),
430 (LspExtExpandMacro, LspExtExpandMacroResponse),
431 (SetRoomParticipantRole, Ack),
432 (BlameBuffer, BlameBufferResponse),
433 (CreateDevServerProject, CreateDevServerProjectResponse),
434 (CreateDevServer, CreateDevServerResponse),
435 (ShutdownDevServer, Ack),
436 (ShareDevServerProject, ShareProjectResponse),
437 (JoinDevServerProject, JoinProjectResponse),
438 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
439 (ReconnectDevServer, ReconnectDevServerResponse),
440 (ValidateDevServerProjectRequest, Ack),
441 (MultiLspQuery, MultiLspQueryResponse),
442 (DeleteDevServer, Ack),
443 (DeleteDevServerProject, Ack),
444 (RegenerateDevServerToken, RegenerateDevServerTokenResponse),
445 (RenameDevServer, Ack),
446 (RestartLanguageServers, Ack)
447);
448
449entity_messages!(
450 {project_id, ShareProject},
451 AddProjectCollaborator,
452 ApplyCodeAction,
453 ApplyCompletionAdditionalEdits,
454 BlameBuffer,
455 BufferReloaded,
456 BufferSaved,
457 CopyProjectEntry,
458 CreateBufferForPeer,
459 CreateProjectEntry,
460 DeleteProjectEntry,
461 ExpandProjectEntry,
462 FormatBuffers,
463 GetCodeActions,
464 GetCompletions,
465 GetDefinition,
466 GetImplementation,
467 GetDocumentHighlights,
468 GetHover,
469 GetProjectSymbols,
470 GetReferences,
471 GetTypeDefinition,
472 InlayHints,
473 JoinProject,
474 LeaveProject,
475 MultiLspQuery,
476 RestartLanguageServers,
477 OnTypeFormatting,
478 OpenNewBuffer,
479 OpenBufferById,
480 OpenBufferByPath,
481 OpenBufferForSymbol,
482 PerformRename,
483 PrepareRename,
484 RefreshInlayHints,
485 ReloadBuffers,
486 RemoveProjectCollaborator,
487 RenameProjectEntry,
488 ResolveCompletionDocumentation,
489 ResolveInlayHint,
490 SaveBuffer,
491 SearchProject,
492 StartLanguageServer,
493 SynchronizeBuffers,
494 TaskContextForLocation,
495 TaskTemplates,
496 UnshareProject,
497 UpdateBuffer,
498 UpdateBufferFile,
499 UpdateDiagnosticSummary,
500 UpdateDiffBase,
501 UpdateLanguageServer,
502 UpdateProject,
503 UpdateProjectCollaborator,
504 UpdateWorktree,
505 UpdateWorktreeSettings,
506 LspExtExpandMacro,
507);
508
509entity_messages!(
510 {channel_id, Channel},
511 ChannelMessageSent,
512 ChannelMessageUpdate,
513 RemoveChannelMessage,
514 UpdateChannelMessage,
515 UpdateChannelBuffer,
516 UpdateChannelBufferCollaborators,
517);
518
519const KIB: usize = 1024;
520const MIB: usize = KIB * 1024;
521const MAX_BUFFER_LEN: usize = MIB;
522
523/// A stream of protobuf messages.
524pub struct MessageStream<S> {
525 stream: S,
526 encoding_buffer: Vec<u8>,
527}
528
529#[allow(clippy::large_enum_variant)]
530#[derive(Debug)]
531pub enum Message {
532 Envelope(Envelope),
533 Ping,
534 Pong,
535}
536
537impl<S> MessageStream<S> {
538 pub fn new(stream: S) -> Self {
539 Self {
540 stream,
541 encoding_buffer: Vec::new(),
542 }
543 }
544
545 pub fn inner_mut(&mut self) -> &mut S {
546 &mut self.stream
547 }
548}
549
550impl<S> MessageStream<S>
551where
552 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
553{
554 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
555 #[cfg(any(test, feature = "test-support"))]
556 const COMPRESSION_LEVEL: i32 = -7;
557
558 #[cfg(not(any(test, feature = "test-support")))]
559 const COMPRESSION_LEVEL: i32 = 4;
560
561 match message {
562 Message::Envelope(message) => {
563 self.encoding_buffer.reserve(message.encoded_len());
564 message
565 .encode(&mut self.encoding_buffer)
566 .map_err(io::Error::from)?;
567 let buffer =
568 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
569 .unwrap();
570
571 self.encoding_buffer.clear();
572 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
573 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
574 }
575 Message::Ping => {
576 self.stream
577 .send(WebSocketMessage::Ping(Default::default()))
578 .await?;
579 }
580 Message::Pong => {
581 self.stream
582 .send(WebSocketMessage::Pong(Default::default()))
583 .await?;
584 }
585 }
586
587 Ok(())
588 }
589}
590
591impl<S> MessageStream<S>
592where
593 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
594{
595 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
596 while let Some(bytes) = self.stream.next().await {
597 let received_at = Instant::now();
598 match bytes? {
599 WebSocketMessage::Binary(bytes) => {
600 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
601 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
602 .map_err(io::Error::from)?;
603
604 self.encoding_buffer.clear();
605 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
606 return Ok((Message::Envelope(envelope), received_at));
607 }
608 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
609 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
610 WebSocketMessage::Close(_) => break,
611 _ => {}
612 }
613 }
614 Err(anyhow!("connection closed"))
615 }
616}
617
618impl From<Timestamp> for SystemTime {
619 fn from(val: Timestamp) -> Self {
620 UNIX_EPOCH
621 .checked_add(Duration::new(val.seconds, val.nanos))
622 .unwrap()
623 }
624}
625
626impl From<SystemTime> for Timestamp {
627 fn from(time: SystemTime) -> Self {
628 let duration = time.duration_since(UNIX_EPOCH).unwrap();
629 Self {
630 seconds: duration.as_secs(),
631 nanos: duration.subsec_nanos(),
632 }
633 }
634}
635
636impl From<u128> for Nonce {
637 fn from(nonce: u128) -> Self {
638 let upper_half = (nonce >> 64) as u64;
639 let lower_half = nonce as u64;
640 Self {
641 upper_half,
642 lower_half,
643 }
644 }
645}
646
647impl From<Nonce> for u128 {
648 fn from(nonce: Nonce) -> Self {
649 let upper_half = (nonce.upper_half as u128) << 64;
650 let lower_half = nonce.lower_half as u128;
651 upper_half | lower_half
652 }
653}
654
655pub fn split_worktree_update(
656 mut message: UpdateWorktree,
657 max_chunk_size: usize,
658) -> impl Iterator<Item = UpdateWorktree> {
659 let mut done_files = false;
660
661 let mut repository_map = message
662 .updated_repositories
663 .into_iter()
664 .map(|repo| (repo.work_directory_id, repo))
665 .collect::<HashMap<_, _>>();
666
667 iter::from_fn(move || {
668 if done_files {
669 return None;
670 }
671
672 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
673 let updated_entries: Vec<_> = message
674 .updated_entries
675 .drain(..updated_entries_chunk_size)
676 .collect();
677
678 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
679 let removed_entries = message
680 .removed_entries
681 .drain(..removed_entries_chunk_size)
682 .collect();
683
684 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
685
686 let mut updated_repositories = Vec::new();
687
688 if !repository_map.is_empty() {
689 for entry in &updated_entries {
690 if let Some(repo) = repository_map.remove(&entry.id) {
691 updated_repositories.push(repo)
692 }
693 }
694 }
695
696 let removed_repositories = if done_files {
697 mem::take(&mut message.removed_repositories)
698 } else {
699 Default::default()
700 };
701
702 if done_files {
703 updated_repositories.extend(mem::take(&mut repository_map).into_values());
704 }
705
706 Some(UpdateWorktree {
707 project_id: message.project_id,
708 worktree_id: message.worktree_id,
709 root_name: message.root_name.clone(),
710 abs_path: message.abs_path.clone(),
711 updated_entries,
712 removed_entries,
713 scan_id: message.scan_id,
714 is_last_update: done_files && message.is_last_update,
715 updated_repositories,
716 removed_repositories,
717 })
718 })
719}
720
721#[cfg(test)]
722mod tests {
723 use super::*;
724
725 #[gpui::test]
726 async fn test_buffer_size() {
727 let (tx, rx) = futures::channel::mpsc::unbounded();
728 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
729 sink.write(Message::Envelope(Envelope {
730 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
731 root_name: "abcdefg".repeat(10),
732 ..Default::default()
733 })),
734 ..Default::default()
735 }))
736 .await
737 .unwrap();
738 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
739 sink.write(Message::Envelope(Envelope {
740 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
741 root_name: "abcdefg".repeat(1000000),
742 ..Default::default()
743 })),
744 ..Default::default()
745 }))
746 .await
747 .unwrap();
748 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
749
750 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
751 stream.read().await.unwrap();
752 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
753 stream.read().await.unwrap();
754 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
755 }
756
757 #[gpui::test]
758 fn test_converting_peer_id_from_and_to_u64() {
759 let peer_id = PeerId {
760 owner_id: 10,
761 id: 3,
762 };
763 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
764 let peer_id = PeerId {
765 owner_id: u32::MAX,
766 id: 3,
767 };
768 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
769 let peer_id = PeerId {
770 owner_id: 10,
771 id: u32::MAX,
772 };
773 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
774 let peer_id = PeerId {
775 owner_id: u32::MAX,
776 id: u32::MAX,
777 };
778 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
779 }
780}