1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (ComputeEmbeddings, Background),
155 (ComputeEmbeddingsResponse, Background),
156 (CopyProjectEntry, Foreground),
157 (CountTokensWithLanguageModel, Background),
158 (CountTokensResponse, Background),
159 (CreateBufferForPeer, Foreground),
160 (CreateChannel, Foreground),
161 (CreateChannelResponse, Foreground),
162 (CreateProjectEntry, Foreground),
163 (CreateRoom, Foreground),
164 (CreateRoomResponse, Foreground),
165 (DeclineCall, Foreground),
166 (DeleteChannel, Foreground),
167 (DeleteNotification, Foreground),
168 (UpdateNotification, Foreground),
169 (DeleteProjectEntry, Foreground),
170 (EndStream, Foreground),
171 (Error, Foreground),
172 (ExpandProjectEntry, Foreground),
173 (ExpandProjectEntryResponse, Foreground),
174 (Follow, Foreground),
175 (FollowResponse, Foreground),
176 (FormatBuffers, Foreground),
177 (FormatBuffersResponse, Foreground),
178 (FuzzySearchUsers, Foreground),
179 (GetCachedEmbeddings, Background),
180 (GetCachedEmbeddingsResponse, Background),
181 (GetChannelMembers, Foreground),
182 (GetChannelMembersResponse, Foreground),
183 (GetChannelMessages, Background),
184 (GetChannelMessagesById, Background),
185 (GetChannelMessagesResponse, Background),
186 (GetCodeActions, Background),
187 (GetCodeActionsResponse, Background),
188 (GetCompletions, Background),
189 (GetCompletionsResponse, Background),
190 (GetDefinition, Background),
191 (GetDefinitionResponse, Background),
192 (GetDocumentHighlights, Background),
193 (GetDocumentHighlightsResponse, Background),
194 (GetHover, Background),
195 (GetHoverResponse, Background),
196 (GetNotifications, Foreground),
197 (GetNotificationsResponse, Foreground),
198 (GetPrivateUserInfo, Foreground),
199 (GetPrivateUserInfoResponse, Foreground),
200 (GetProjectSymbols, Background),
201 (GetProjectSymbolsResponse, Background),
202 (GetReferences, Background),
203 (GetReferencesResponse, Background),
204 (GetSupermavenApiKey, Background),
205 (GetSupermavenApiKeyResponse, Background),
206 (GetTypeDefinition, Background),
207 (GetTypeDefinitionResponse, Background),
208 (GetImplementation, Background),
209 (GetImplementationResponse, Background),
210 (GetUsers, Foreground),
211 (Hello, Foreground),
212 (IncomingCall, Foreground),
213 (InlayHints, Background),
214 (InlayHintsResponse, Background),
215 (InviteChannelMember, Foreground),
216 (JoinChannel, Foreground),
217 (JoinChannelBuffer, Foreground),
218 (JoinChannelBufferResponse, Foreground),
219 (JoinChannelChat, Foreground),
220 (JoinChannelChatResponse, Foreground),
221 (JoinProject, Foreground),
222 (JoinHostedProject, Foreground),
223 (JoinProjectResponse, Foreground),
224 (JoinRoom, Foreground),
225 (JoinRoomResponse, Foreground),
226 (LanguageModelResponse, Background),
227 (LeaveChannelBuffer, Background),
228 (LeaveChannelChat, Foreground),
229 (LeaveProject, Foreground),
230 (LeaveRoom, Foreground),
231 (MarkNotificationRead, Foreground),
232 (MoveChannel, Foreground),
233 (OnTypeFormatting, Background),
234 (OnTypeFormattingResponse, Background),
235 (OpenBufferById, Background),
236 (OpenBufferByPath, Background),
237 (OpenBufferForSymbol, Background),
238 (OpenBufferForSymbolResponse, Background),
239 (OpenBufferResponse, Background),
240 (PerformRename, Background),
241 (PerformRenameResponse, Background),
242 (Ping, Foreground),
243 (PrepareRename, Background),
244 (PrepareRenameResponse, Background),
245 (ProjectEntryResponse, Foreground),
246 (RefreshInlayHints, Foreground),
247 (RejoinChannelBuffers, Foreground),
248 (RejoinChannelBuffersResponse, Foreground),
249 (RejoinRoom, Foreground),
250 (RejoinRoomResponse, Foreground),
251 (ReloadBuffers, Foreground),
252 (ReloadBuffersResponse, Foreground),
253 (RemoveChannelMember, Foreground),
254 (RemoveChannelMessage, Foreground),
255 (UpdateChannelMessage, Foreground),
256 (RemoveContact, Foreground),
257 (RemoveProjectCollaborator, Foreground),
258 (RenameChannel, Foreground),
259 (RenameChannelResponse, Foreground),
260 (RenameProjectEntry, Foreground),
261 (RequestContact, Foreground),
262 (ResolveCompletionDocumentation, Background),
263 (ResolveCompletionDocumentationResponse, Background),
264 (ResolveInlayHint, Background),
265 (ResolveInlayHintResponse, Background),
266 (RespondToChannelInvite, Foreground),
267 (RespondToContactRequest, Foreground),
268 (RoomUpdated, Foreground),
269 (SaveBuffer, Foreground),
270 (SetChannelMemberRole, Foreground),
271 (SetChannelVisibility, Foreground),
272 (SearchProject, Background),
273 (SearchProjectResponse, Background),
274 (SendChannelMessage, Background),
275 (SendChannelMessageResponse, Background),
276 (ShareProject, Foreground),
277 (ShareProjectResponse, Foreground),
278 (ShowContacts, Foreground),
279 (StartLanguageServer, Foreground),
280 (SubscribeToChannels, Foreground),
281 (SynchronizeBuffers, Foreground),
282 (SynchronizeBuffersResponse, Foreground),
283 (TaskContextForLocation, Background),
284 (TaskContext, Background),
285 (TaskTemplates, Background),
286 (TaskTemplatesResponse, Background),
287 (Test, Foreground),
288 (Unfollow, Foreground),
289 (UnshareProject, Foreground),
290 (UpdateBuffer, Foreground),
291 (UpdateBufferFile, Foreground),
292 (UpdateChannelBuffer, Foreground),
293 (UpdateChannelBufferCollaborators, Foreground),
294 (UpdateChannels, Foreground),
295 (UpdateUserChannels, Foreground),
296 (UpdateContacts, Foreground),
297 (UpdateDiagnosticSummary, Foreground),
298 (UpdateDiffBase, Foreground),
299 (UpdateFollowers, Foreground),
300 (UpdateInviteInfo, Foreground),
301 (UpdateLanguageServer, Foreground),
302 (UpdateParticipantLocation, Foreground),
303 (UpdateProject, Foreground),
304 (UpdateProjectCollaborator, Foreground),
305 (UpdateWorktree, Foreground),
306 (UpdateWorktreeSettings, Foreground),
307 (UsersResponse, Foreground),
308 (LspExtExpandMacro, Background),
309 (LspExtExpandMacroResponse, Background),
310 (SetRoomParticipantRole, Foreground),
311 (BlameBuffer, Foreground),
312 (BlameBufferResponse, Foreground),
313 (CreateDevServerProject, Background),
314 (CreateDevServerProjectResponse, Foreground),
315 (CreateDevServer, Foreground),
316 (CreateDevServerResponse, Foreground),
317 (DevServerInstructions, Foreground),
318 (ShutdownDevServer, Foreground),
319 (ReconnectDevServer, Foreground),
320 (ReconnectDevServerResponse, Foreground),
321 (ShareDevServerProject, Foreground),
322 (JoinDevServerProject, Foreground),
323 (RejoinRemoteProjects, Foreground),
324 (RejoinRemoteProjectsResponse, Foreground),
325 (MultiLspQuery, Background),
326 (MultiLspQueryResponse, Background),
327 (DevServerProjectsUpdate, Foreground),
328 (ValidateDevServerProjectRequest, Background),
329 (DeleteDevServer, Foreground),
330 (DeleteDevServerProject, Foreground),
331 (RegenerateDevServerToken, Foreground),
332 (RegenerateDevServerTokenResponse, Foreground),
333 (RenameDevServer, Foreground),
334 (OpenNewBuffer, Foreground),
335);
336
337request_messages!(
338 (ApplyCodeAction, ApplyCodeActionResponse),
339 (
340 ApplyCompletionAdditionalEdits,
341 ApplyCompletionAdditionalEditsResponse
342 ),
343 (Call, Ack),
344 (CancelCall, Ack),
345 (CopyProjectEntry, ProjectEntryResponse),
346 (CompleteWithLanguageModel, LanguageModelResponse),
347 (ComputeEmbeddings, ComputeEmbeddingsResponse),
348 (CountTokensWithLanguageModel, CountTokensResponse),
349 (CreateChannel, CreateChannelResponse),
350 (CreateProjectEntry, ProjectEntryResponse),
351 (CreateRoom, CreateRoomResponse),
352 (DeclineCall, Ack),
353 (DeleteChannel, Ack),
354 (DeleteProjectEntry, ProjectEntryResponse),
355 (ExpandProjectEntry, ExpandProjectEntryResponse),
356 (Follow, FollowResponse),
357 (FormatBuffers, FormatBuffersResponse),
358 (FuzzySearchUsers, UsersResponse),
359 (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
360 (GetChannelMembers, GetChannelMembersResponse),
361 (GetChannelMessages, GetChannelMessagesResponse),
362 (GetChannelMessagesById, GetChannelMessagesResponse),
363 (GetCodeActions, GetCodeActionsResponse),
364 (GetCompletions, GetCompletionsResponse),
365 (GetDefinition, GetDefinitionResponse),
366 (GetImplementation, GetImplementationResponse),
367 (GetDocumentHighlights, GetDocumentHighlightsResponse),
368 (GetHover, GetHoverResponse),
369 (GetNotifications, GetNotificationsResponse),
370 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
371 (GetProjectSymbols, GetProjectSymbolsResponse),
372 (GetReferences, GetReferencesResponse),
373 (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
374 (GetTypeDefinition, GetTypeDefinitionResponse),
375 (GetUsers, UsersResponse),
376 (IncomingCall, Ack),
377 (InlayHints, InlayHintsResponse),
378 (InviteChannelMember, Ack),
379 (JoinChannel, JoinRoomResponse),
380 (JoinChannelBuffer, JoinChannelBufferResponse),
381 (JoinChannelChat, JoinChannelChatResponse),
382 (JoinHostedProject, JoinProjectResponse),
383 (JoinProject, JoinProjectResponse),
384 (JoinRoom, JoinRoomResponse),
385 (LeaveChannelBuffer, Ack),
386 (LeaveRoom, Ack),
387 (MarkNotificationRead, Ack),
388 (MoveChannel, Ack),
389 (OnTypeFormatting, OnTypeFormattingResponse),
390 (OpenBufferById, OpenBufferResponse),
391 (OpenBufferByPath, OpenBufferResponse),
392 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
393 (OpenNewBuffer, OpenBufferResponse),
394 (PerformRename, PerformRenameResponse),
395 (Ping, Ack),
396 (PrepareRename, PrepareRenameResponse),
397 (RefreshInlayHints, Ack),
398 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
399 (RejoinRoom, RejoinRoomResponse),
400 (ReloadBuffers, ReloadBuffersResponse),
401 (RemoveChannelMember, Ack),
402 (RemoveChannelMessage, Ack),
403 (UpdateChannelMessage, Ack),
404 (RemoveContact, Ack),
405 (RenameChannel, RenameChannelResponse),
406 (RenameProjectEntry, ProjectEntryResponse),
407 (RequestContact, Ack),
408 (
409 ResolveCompletionDocumentation,
410 ResolveCompletionDocumentationResponse
411 ),
412 (ResolveInlayHint, ResolveInlayHintResponse),
413 (RespondToChannelInvite, Ack),
414 (RespondToContactRequest, Ack),
415 (SaveBuffer, BufferSaved),
416 (SearchProject, SearchProjectResponse),
417 (SendChannelMessage, SendChannelMessageResponse),
418 (SetChannelMemberRole, Ack),
419 (SetChannelVisibility, Ack),
420 (ShareProject, ShareProjectResponse),
421 (SynchronizeBuffers, SynchronizeBuffersResponse),
422 (TaskContextForLocation, TaskContext),
423 (TaskTemplates, TaskTemplatesResponse),
424 (Test, Test),
425 (UpdateBuffer, Ack),
426 (UpdateParticipantLocation, Ack),
427 (UpdateProject, Ack),
428 (UpdateWorktree, Ack),
429 (LspExtExpandMacro, LspExtExpandMacroResponse),
430 (SetRoomParticipantRole, Ack),
431 (BlameBuffer, BlameBufferResponse),
432 (CreateDevServerProject, CreateDevServerProjectResponse),
433 (CreateDevServer, CreateDevServerResponse),
434 (ShutdownDevServer, Ack),
435 (ShareDevServerProject, ShareProjectResponse),
436 (JoinDevServerProject, JoinProjectResponse),
437 (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
438 (ReconnectDevServer, ReconnectDevServerResponse),
439 (ValidateDevServerProjectRequest, Ack),
440 (MultiLspQuery, MultiLspQueryResponse),
441 (DeleteDevServer, Ack),
442 (DeleteDevServerProject, Ack),
443 (RegenerateDevServerToken, RegenerateDevServerTokenResponse),
444 (RenameDevServer, Ack)
445);
446
447entity_messages!(
448 {project_id, ShareProject},
449 AddProjectCollaborator,
450 ApplyCodeAction,
451 ApplyCompletionAdditionalEdits,
452 BlameBuffer,
453 BufferReloaded,
454 BufferSaved,
455 CopyProjectEntry,
456 CreateBufferForPeer,
457 CreateProjectEntry,
458 DeleteProjectEntry,
459 ExpandProjectEntry,
460 FormatBuffers,
461 GetCodeActions,
462 GetCompletions,
463 GetDefinition,
464 GetImplementation,
465 GetDocumentHighlights,
466 GetHover,
467 GetProjectSymbols,
468 GetReferences,
469 GetTypeDefinition,
470 InlayHints,
471 JoinProject,
472 LeaveProject,
473 MultiLspQuery,
474 OnTypeFormatting,
475 OpenNewBuffer,
476 OpenBufferById,
477 OpenBufferByPath,
478 OpenBufferForSymbol,
479 PerformRename,
480 PrepareRename,
481 RefreshInlayHints,
482 ReloadBuffers,
483 RemoveProjectCollaborator,
484 RenameProjectEntry,
485 ResolveCompletionDocumentation,
486 ResolveInlayHint,
487 SaveBuffer,
488 SearchProject,
489 StartLanguageServer,
490 SynchronizeBuffers,
491 TaskContextForLocation,
492 TaskTemplates,
493 UnshareProject,
494 UpdateBuffer,
495 UpdateBufferFile,
496 UpdateDiagnosticSummary,
497 UpdateDiffBase,
498 UpdateLanguageServer,
499 UpdateProject,
500 UpdateProjectCollaborator,
501 UpdateWorktree,
502 UpdateWorktreeSettings,
503 LspExtExpandMacro,
504);
505
506entity_messages!(
507 {channel_id, Channel},
508 ChannelMessageSent,
509 ChannelMessageUpdate,
510 RemoveChannelMessage,
511 UpdateChannelMessage,
512 UpdateChannelBuffer,
513 UpdateChannelBufferCollaborators,
514);
515
516const KIB: usize = 1024;
517const MIB: usize = KIB * 1024;
518const MAX_BUFFER_LEN: usize = MIB;
519
520/// A stream of protobuf messages.
521pub struct MessageStream<S> {
522 stream: S,
523 encoding_buffer: Vec<u8>,
524}
525
526#[allow(clippy::large_enum_variant)]
527#[derive(Debug)]
528pub enum Message {
529 Envelope(Envelope),
530 Ping,
531 Pong,
532}
533
534impl<S> MessageStream<S> {
535 pub fn new(stream: S) -> Self {
536 Self {
537 stream,
538 encoding_buffer: Vec::new(),
539 }
540 }
541
542 pub fn inner_mut(&mut self) -> &mut S {
543 &mut self.stream
544 }
545}
546
547impl<S> MessageStream<S>
548where
549 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
550{
551 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
552 #[cfg(any(test, feature = "test-support"))]
553 const COMPRESSION_LEVEL: i32 = -7;
554
555 #[cfg(not(any(test, feature = "test-support")))]
556 const COMPRESSION_LEVEL: i32 = 4;
557
558 match message {
559 Message::Envelope(message) => {
560 self.encoding_buffer.reserve(message.encoded_len());
561 message
562 .encode(&mut self.encoding_buffer)
563 .map_err(io::Error::from)?;
564 let buffer =
565 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
566 .unwrap();
567
568 self.encoding_buffer.clear();
569 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
570 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
571 }
572 Message::Ping => {
573 self.stream
574 .send(WebSocketMessage::Ping(Default::default()))
575 .await?;
576 }
577 Message::Pong => {
578 self.stream
579 .send(WebSocketMessage::Pong(Default::default()))
580 .await?;
581 }
582 }
583
584 Ok(())
585 }
586}
587
588impl<S> MessageStream<S>
589where
590 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
591{
592 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
593 while let Some(bytes) = self.stream.next().await {
594 let received_at = Instant::now();
595 match bytes? {
596 WebSocketMessage::Binary(bytes) => {
597 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
598 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
599 .map_err(io::Error::from)?;
600
601 self.encoding_buffer.clear();
602 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
603 return Ok((Message::Envelope(envelope), received_at));
604 }
605 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
606 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
607 WebSocketMessage::Close(_) => break,
608 _ => {}
609 }
610 }
611 Err(anyhow!("connection closed"))
612 }
613}
614
615impl From<Timestamp> for SystemTime {
616 fn from(val: Timestamp) -> Self {
617 UNIX_EPOCH
618 .checked_add(Duration::new(val.seconds, val.nanos))
619 .unwrap()
620 }
621}
622
623impl From<SystemTime> for Timestamp {
624 fn from(time: SystemTime) -> Self {
625 let duration = time.duration_since(UNIX_EPOCH).unwrap();
626 Self {
627 seconds: duration.as_secs(),
628 nanos: duration.subsec_nanos(),
629 }
630 }
631}
632
633impl From<u128> for Nonce {
634 fn from(nonce: u128) -> Self {
635 let upper_half = (nonce >> 64) as u64;
636 let lower_half = nonce as u64;
637 Self {
638 upper_half,
639 lower_half,
640 }
641 }
642}
643
644impl From<Nonce> for u128 {
645 fn from(nonce: Nonce) -> Self {
646 let upper_half = (nonce.upper_half as u128) << 64;
647 let lower_half = nonce.lower_half as u128;
648 upper_half | lower_half
649 }
650}
651
652pub fn split_worktree_update(
653 mut message: UpdateWorktree,
654 max_chunk_size: usize,
655) -> impl Iterator<Item = UpdateWorktree> {
656 let mut done_files = false;
657
658 let mut repository_map = message
659 .updated_repositories
660 .into_iter()
661 .map(|repo| (repo.work_directory_id, repo))
662 .collect::<HashMap<_, _>>();
663
664 iter::from_fn(move || {
665 if done_files {
666 return None;
667 }
668
669 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
670 let updated_entries: Vec<_> = message
671 .updated_entries
672 .drain(..updated_entries_chunk_size)
673 .collect();
674
675 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
676 let removed_entries = message
677 .removed_entries
678 .drain(..removed_entries_chunk_size)
679 .collect();
680
681 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
682
683 let mut updated_repositories = Vec::new();
684
685 if !repository_map.is_empty() {
686 for entry in &updated_entries {
687 if let Some(repo) = repository_map.remove(&entry.id) {
688 updated_repositories.push(repo)
689 }
690 }
691 }
692
693 let removed_repositories = if done_files {
694 mem::take(&mut message.removed_repositories)
695 } else {
696 Default::default()
697 };
698
699 if done_files {
700 updated_repositories.extend(mem::take(&mut repository_map).into_values());
701 }
702
703 Some(UpdateWorktree {
704 project_id: message.project_id,
705 worktree_id: message.worktree_id,
706 root_name: message.root_name.clone(),
707 abs_path: message.abs_path.clone(),
708 updated_entries,
709 removed_entries,
710 scan_id: message.scan_id,
711 is_last_update: done_files && message.is_last_update,
712 updated_repositories,
713 removed_repositories,
714 })
715 })
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721
722 #[gpui::test]
723 async fn test_buffer_size() {
724 let (tx, rx) = futures::channel::mpsc::unbounded();
725 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
726 sink.write(Message::Envelope(Envelope {
727 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
728 root_name: "abcdefg".repeat(10),
729 ..Default::default()
730 })),
731 ..Default::default()
732 }))
733 .await
734 .unwrap();
735 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
736 sink.write(Message::Envelope(Envelope {
737 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
738 root_name: "abcdefg".repeat(1000000),
739 ..Default::default()
740 })),
741 ..Default::default()
742 }))
743 .await
744 .unwrap();
745 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
746
747 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
748 stream.read().await.unwrap();
749 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
750 stream.read().await.unwrap();
751 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
752 }
753
754 #[gpui::test]
755 fn test_converting_peer_id_from_and_to_u64() {
756 let peer_id = PeerId {
757 owner_id: 10,
758 id: 3,
759 };
760 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
761 let peer_id = PeerId {
762 owner_id: u32::MAX,
763 id: 3,
764 };
765 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
766 let peer_id = PeerId {
767 owner_id: 10,
768 id: u32::MAX,
769 };
770 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
771 let peer_id = PeerId {
772 owner_id: u32::MAX,
773 id: u32::MAX,
774 };
775 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
776 }
777}