proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetSupermavenApiKey, Background),
205    (GetSupermavenApiKeyResponse, Background),
206    (GetTypeDefinition, Background),
207    (GetTypeDefinitionResponse, Background),
208    (GetImplementation, Background),
209    (GetImplementationResponse, Background),
210    (GetUsers, Foreground),
211    (Hello, Foreground),
212    (IncomingCall, Foreground),
213    (InlayHints, Background),
214    (InlayHintsResponse, Background),
215    (InviteChannelMember, Foreground),
216    (JoinChannel, Foreground),
217    (JoinChannelBuffer, Foreground),
218    (JoinChannelBufferResponse, Foreground),
219    (JoinChannelChat, Foreground),
220    (JoinChannelChatResponse, Foreground),
221    (JoinProject, Foreground),
222    (JoinHostedProject, Foreground),
223    (JoinProjectResponse, Foreground),
224    (JoinRoom, Foreground),
225    (JoinRoomResponse, Foreground),
226    (LanguageModelResponse, Background),
227    (LeaveChannelBuffer, Background),
228    (LeaveChannelChat, Foreground),
229    (LeaveProject, Foreground),
230    (LeaveRoom, Foreground),
231    (MarkNotificationRead, Foreground),
232    (MoveChannel, Foreground),
233    (OnTypeFormatting, Background),
234    (OnTypeFormattingResponse, Background),
235    (OpenBufferById, Background),
236    (OpenBufferByPath, Background),
237    (OpenBufferForSymbol, Background),
238    (OpenBufferForSymbolResponse, Background),
239    (OpenBufferResponse, Background),
240    (PerformRename, Background),
241    (PerformRenameResponse, Background),
242    (Ping, Foreground),
243    (PrepareRename, Background),
244    (PrepareRenameResponse, Background),
245    (ProjectEntryResponse, Foreground),
246    (RefreshInlayHints, Foreground),
247    (RejoinChannelBuffers, Foreground),
248    (RejoinChannelBuffersResponse, Foreground),
249    (RejoinRoom, Foreground),
250    (RejoinRoomResponse, Foreground),
251    (ReloadBuffers, Foreground),
252    (ReloadBuffersResponse, Foreground),
253    (RemoveChannelMember, Foreground),
254    (RemoveChannelMessage, Foreground),
255    (UpdateChannelMessage, Foreground),
256    (RemoveContact, Foreground),
257    (RemoveProjectCollaborator, Foreground),
258    (RenameChannel, Foreground),
259    (RenameChannelResponse, Foreground),
260    (RenameProjectEntry, Foreground),
261    (RequestContact, Foreground),
262    (ResolveCompletionDocumentation, Background),
263    (ResolveCompletionDocumentationResponse, Background),
264    (ResolveInlayHint, Background),
265    (ResolveInlayHintResponse, Background),
266    (RespondToChannelInvite, Foreground),
267    (RespondToContactRequest, Foreground),
268    (RoomUpdated, Foreground),
269    (SaveBuffer, Foreground),
270    (SetChannelMemberRole, Foreground),
271    (SetChannelVisibility, Foreground),
272    (SearchProject, Background),
273    (SearchProjectResponse, Background),
274    (SendChannelMessage, Background),
275    (SendChannelMessageResponse, Background),
276    (ShareProject, Foreground),
277    (ShareProjectResponse, Foreground),
278    (ShowContacts, Foreground),
279    (StartLanguageServer, Foreground),
280    (SubscribeToChannels, Foreground),
281    (SynchronizeBuffers, Foreground),
282    (SynchronizeBuffersResponse, Foreground),
283    (TaskContextForLocation, Background),
284    (TaskContext, Background),
285    (TaskTemplates, Background),
286    (TaskTemplatesResponse, Background),
287    (Test, Foreground),
288    (Unfollow, Foreground),
289    (UnshareProject, Foreground),
290    (UpdateBuffer, Foreground),
291    (UpdateBufferFile, Foreground),
292    (UpdateChannelBuffer, Foreground),
293    (UpdateChannelBufferCollaborators, Foreground),
294    (UpdateChannels, Foreground),
295    (UpdateUserChannels, Foreground),
296    (UpdateContacts, Foreground),
297    (UpdateDiagnosticSummary, Foreground),
298    (UpdateDiffBase, Foreground),
299    (UpdateFollowers, Foreground),
300    (UpdateInviteInfo, Foreground),
301    (UpdateLanguageServer, Foreground),
302    (UpdateParticipantLocation, Foreground),
303    (UpdateProject, Foreground),
304    (UpdateProjectCollaborator, Foreground),
305    (UpdateWorktree, Foreground),
306    (UpdateWorktreeSettings, Foreground),
307    (UsersResponse, Foreground),
308    (LspExtExpandMacro, Background),
309    (LspExtExpandMacroResponse, Background),
310    (SetRoomParticipantRole, Foreground),
311    (BlameBuffer, Foreground),
312    (BlameBufferResponse, Foreground),
313    (CreateDevServerProject, Background),
314    (CreateDevServerProjectResponse, Foreground),
315    (CreateDevServer, Foreground),
316    (CreateDevServerResponse, Foreground),
317    (DevServerInstructions, Foreground),
318    (ShutdownDevServer, Foreground),
319    (ReconnectDevServer, Foreground),
320    (ReconnectDevServerResponse, Foreground),
321    (ShareDevServerProject, Foreground),
322    (JoinDevServerProject, Foreground),
323    (RejoinRemoteProjects, Foreground),
324    (RejoinRemoteProjectsResponse, Foreground),
325    (MultiLspQuery, Background),
326    (MultiLspQueryResponse, Background),
327    (DevServerProjectsUpdate, Foreground),
328    (ValidateDevServerProjectRequest, Background),
329    (DeleteDevServer, Foreground),
330    (DeleteDevServerProject, Foreground),
331    (RegenerateDevServerToken, Foreground),
332    (RegenerateDevServerTokenResponse, Foreground),
333    (RenameDevServer, Foreground),
334    (OpenNewBuffer, Foreground),
335    (RestartLanguageServers, Foreground),
336);
337
338request_messages!(
339    (ApplyCodeAction, ApplyCodeActionResponse),
340    (
341        ApplyCompletionAdditionalEdits,
342        ApplyCompletionAdditionalEditsResponse
343    ),
344    (Call, Ack),
345    (CancelCall, Ack),
346    (CopyProjectEntry, ProjectEntryResponse),
347    (CompleteWithLanguageModel, LanguageModelResponse),
348    (ComputeEmbeddings, ComputeEmbeddingsResponse),
349    (CountTokensWithLanguageModel, CountTokensResponse),
350    (CreateChannel, CreateChannelResponse),
351    (CreateProjectEntry, ProjectEntryResponse),
352    (CreateRoom, CreateRoomResponse),
353    (DeclineCall, Ack),
354    (DeleteChannel, Ack),
355    (DeleteProjectEntry, ProjectEntryResponse),
356    (ExpandProjectEntry, ExpandProjectEntryResponse),
357    (Follow, FollowResponse),
358    (FormatBuffers, FormatBuffersResponse),
359    (FuzzySearchUsers, UsersResponse),
360    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
361    (GetChannelMembers, GetChannelMembersResponse),
362    (GetChannelMessages, GetChannelMessagesResponse),
363    (GetChannelMessagesById, GetChannelMessagesResponse),
364    (GetCodeActions, GetCodeActionsResponse),
365    (GetCompletions, GetCompletionsResponse),
366    (GetDefinition, GetDefinitionResponse),
367    (GetImplementation, GetImplementationResponse),
368    (GetDocumentHighlights, GetDocumentHighlightsResponse),
369    (GetHover, GetHoverResponse),
370    (GetNotifications, GetNotificationsResponse),
371    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
372    (GetProjectSymbols, GetProjectSymbolsResponse),
373    (GetReferences, GetReferencesResponse),
374    (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
375    (GetTypeDefinition, GetTypeDefinitionResponse),
376    (GetUsers, UsersResponse),
377    (IncomingCall, Ack),
378    (InlayHints, InlayHintsResponse),
379    (InviteChannelMember, Ack),
380    (JoinChannel, JoinRoomResponse),
381    (JoinChannelBuffer, JoinChannelBufferResponse),
382    (JoinChannelChat, JoinChannelChatResponse),
383    (JoinHostedProject, JoinProjectResponse),
384    (JoinProject, JoinProjectResponse),
385    (JoinRoom, JoinRoomResponse),
386    (LeaveChannelBuffer, Ack),
387    (LeaveRoom, Ack),
388    (MarkNotificationRead, Ack),
389    (MoveChannel, Ack),
390    (OnTypeFormatting, OnTypeFormattingResponse),
391    (OpenBufferById, OpenBufferResponse),
392    (OpenBufferByPath, OpenBufferResponse),
393    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
394    (OpenNewBuffer, OpenBufferResponse),
395    (PerformRename, PerformRenameResponse),
396    (Ping, Ack),
397    (PrepareRename, PrepareRenameResponse),
398    (RefreshInlayHints, Ack),
399    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
400    (RejoinRoom, RejoinRoomResponse),
401    (ReloadBuffers, ReloadBuffersResponse),
402    (RemoveChannelMember, Ack),
403    (RemoveChannelMessage, Ack),
404    (UpdateChannelMessage, Ack),
405    (RemoveContact, Ack),
406    (RenameChannel, RenameChannelResponse),
407    (RenameProjectEntry, ProjectEntryResponse),
408    (RequestContact, Ack),
409    (
410        ResolveCompletionDocumentation,
411        ResolveCompletionDocumentationResponse
412    ),
413    (ResolveInlayHint, ResolveInlayHintResponse),
414    (RespondToChannelInvite, Ack),
415    (RespondToContactRequest, Ack),
416    (SaveBuffer, BufferSaved),
417    (SearchProject, SearchProjectResponse),
418    (SendChannelMessage, SendChannelMessageResponse),
419    (SetChannelMemberRole, Ack),
420    (SetChannelVisibility, Ack),
421    (ShareProject, ShareProjectResponse),
422    (SynchronizeBuffers, SynchronizeBuffersResponse),
423    (TaskContextForLocation, TaskContext),
424    (TaskTemplates, TaskTemplatesResponse),
425    (Test, Test),
426    (UpdateBuffer, Ack),
427    (UpdateParticipantLocation, Ack),
428    (UpdateProject, Ack),
429    (UpdateWorktree, Ack),
430    (LspExtExpandMacro, LspExtExpandMacroResponse),
431    (SetRoomParticipantRole, Ack),
432    (BlameBuffer, BlameBufferResponse),
433    (CreateDevServerProject, CreateDevServerProjectResponse),
434    (CreateDevServer, CreateDevServerResponse),
435    (ShutdownDevServer, Ack),
436    (ShareDevServerProject, ShareProjectResponse),
437    (JoinDevServerProject, JoinProjectResponse),
438    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
439    (ReconnectDevServer, ReconnectDevServerResponse),
440    (ValidateDevServerProjectRequest, Ack),
441    (MultiLspQuery, MultiLspQueryResponse),
442    (DeleteDevServer, Ack),
443    (DeleteDevServerProject, Ack),
444    (RegenerateDevServerToken, RegenerateDevServerTokenResponse),
445    (RenameDevServer, Ack),
446    (RestartLanguageServers, Ack)
447);
448
449entity_messages!(
450    {project_id, ShareProject},
451    AddProjectCollaborator,
452    ApplyCodeAction,
453    ApplyCompletionAdditionalEdits,
454    BlameBuffer,
455    BufferReloaded,
456    BufferSaved,
457    CopyProjectEntry,
458    CreateBufferForPeer,
459    CreateProjectEntry,
460    DeleteProjectEntry,
461    ExpandProjectEntry,
462    FormatBuffers,
463    GetCodeActions,
464    GetCompletions,
465    GetDefinition,
466    GetImplementation,
467    GetDocumentHighlights,
468    GetHover,
469    GetProjectSymbols,
470    GetReferences,
471    GetTypeDefinition,
472    InlayHints,
473    JoinProject,
474    LeaveProject,
475    MultiLspQuery,
476    RestartLanguageServers,
477    OnTypeFormatting,
478    OpenNewBuffer,
479    OpenBufferById,
480    OpenBufferByPath,
481    OpenBufferForSymbol,
482    PerformRename,
483    PrepareRename,
484    RefreshInlayHints,
485    ReloadBuffers,
486    RemoveProjectCollaborator,
487    RenameProjectEntry,
488    ResolveCompletionDocumentation,
489    ResolveInlayHint,
490    SaveBuffer,
491    SearchProject,
492    StartLanguageServer,
493    SynchronizeBuffers,
494    TaskContextForLocation,
495    TaskTemplates,
496    UnshareProject,
497    UpdateBuffer,
498    UpdateBufferFile,
499    UpdateDiagnosticSummary,
500    UpdateDiffBase,
501    UpdateLanguageServer,
502    UpdateProject,
503    UpdateProjectCollaborator,
504    UpdateWorktree,
505    UpdateWorktreeSettings,
506    LspExtExpandMacro,
507);
508
509entity_messages!(
510    {channel_id, Channel},
511    ChannelMessageSent,
512    ChannelMessageUpdate,
513    RemoveChannelMessage,
514    UpdateChannelMessage,
515    UpdateChannelBuffer,
516    UpdateChannelBufferCollaborators,
517);
518
519const KIB: usize = 1024;
520const MIB: usize = KIB * 1024;
521const MAX_BUFFER_LEN: usize = MIB;
522
523/// A stream of protobuf messages.
524pub struct MessageStream<S> {
525    stream: S,
526    encoding_buffer: Vec<u8>,
527}
528
529#[allow(clippy::large_enum_variant)]
530#[derive(Debug)]
531pub enum Message {
532    Envelope(Envelope),
533    Ping,
534    Pong,
535}
536
537impl<S> MessageStream<S> {
538    pub fn new(stream: S) -> Self {
539        Self {
540            stream,
541            encoding_buffer: Vec::new(),
542        }
543    }
544
545    pub fn inner_mut(&mut self) -> &mut S {
546        &mut self.stream
547    }
548}
549
550impl<S> MessageStream<S>
551where
552    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
553{
554    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
555        #[cfg(any(test, feature = "test-support"))]
556        const COMPRESSION_LEVEL: i32 = -7;
557
558        #[cfg(not(any(test, feature = "test-support")))]
559        const COMPRESSION_LEVEL: i32 = 4;
560
561        match message {
562            Message::Envelope(message) => {
563                self.encoding_buffer.reserve(message.encoded_len());
564                message
565                    .encode(&mut self.encoding_buffer)
566                    .map_err(io::Error::from)?;
567                let buffer =
568                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
569                        .unwrap();
570
571                self.encoding_buffer.clear();
572                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
573                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
574            }
575            Message::Ping => {
576                self.stream
577                    .send(WebSocketMessage::Ping(Default::default()))
578                    .await?;
579            }
580            Message::Pong => {
581                self.stream
582                    .send(WebSocketMessage::Pong(Default::default()))
583                    .await?;
584            }
585        }
586
587        Ok(())
588    }
589}
590
591impl<S> MessageStream<S>
592where
593    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
594{
595    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
596        while let Some(bytes) = self.stream.next().await {
597            let received_at = Instant::now();
598            match bytes? {
599                WebSocketMessage::Binary(bytes) => {
600                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
601                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
602                        .map_err(io::Error::from)?;
603
604                    self.encoding_buffer.clear();
605                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
606                    return Ok((Message::Envelope(envelope), received_at));
607                }
608                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
609                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
610                WebSocketMessage::Close(_) => break,
611                _ => {}
612            }
613        }
614        Err(anyhow!("connection closed"))
615    }
616}
617
618impl From<Timestamp> for SystemTime {
619    fn from(val: Timestamp) -> Self {
620        UNIX_EPOCH
621            .checked_add(Duration::new(val.seconds, val.nanos))
622            .unwrap()
623    }
624}
625
626impl From<SystemTime> for Timestamp {
627    fn from(time: SystemTime) -> Self {
628        let duration = time.duration_since(UNIX_EPOCH).unwrap();
629        Self {
630            seconds: duration.as_secs(),
631            nanos: duration.subsec_nanos(),
632        }
633    }
634}
635
636impl From<u128> for Nonce {
637    fn from(nonce: u128) -> Self {
638        let upper_half = (nonce >> 64) as u64;
639        let lower_half = nonce as u64;
640        Self {
641            upper_half,
642            lower_half,
643        }
644    }
645}
646
647impl From<Nonce> for u128 {
648    fn from(nonce: Nonce) -> Self {
649        let upper_half = (nonce.upper_half as u128) << 64;
650        let lower_half = nonce.lower_half as u128;
651        upper_half | lower_half
652    }
653}
654
655pub fn split_worktree_update(
656    mut message: UpdateWorktree,
657    max_chunk_size: usize,
658) -> impl Iterator<Item = UpdateWorktree> {
659    let mut done_files = false;
660
661    let mut repository_map = message
662        .updated_repositories
663        .into_iter()
664        .map(|repo| (repo.work_directory_id, repo))
665        .collect::<HashMap<_, _>>();
666
667    iter::from_fn(move || {
668        if done_files {
669            return None;
670        }
671
672        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
673        let updated_entries: Vec<_> = message
674            .updated_entries
675            .drain(..updated_entries_chunk_size)
676            .collect();
677
678        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
679        let removed_entries = message
680            .removed_entries
681            .drain(..removed_entries_chunk_size)
682            .collect();
683
684        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
685
686        let mut updated_repositories = Vec::new();
687
688        if !repository_map.is_empty() {
689            for entry in &updated_entries {
690                if let Some(repo) = repository_map.remove(&entry.id) {
691                    updated_repositories.push(repo)
692                }
693            }
694        }
695
696        let removed_repositories = if done_files {
697            mem::take(&mut message.removed_repositories)
698        } else {
699            Default::default()
700        };
701
702        if done_files {
703            updated_repositories.extend(mem::take(&mut repository_map).into_values());
704        }
705
706        Some(UpdateWorktree {
707            project_id: message.project_id,
708            worktree_id: message.worktree_id,
709            root_name: message.root_name.clone(),
710            abs_path: message.abs_path.clone(),
711            updated_entries,
712            removed_entries,
713            scan_id: message.scan_id,
714            is_last_update: done_files && message.is_last_update,
715            updated_repositories,
716            removed_repositories,
717        })
718    })
719}
720
721#[cfg(test)]
722mod tests {
723    use super::*;
724
725    #[gpui::test]
726    async fn test_buffer_size() {
727        let (tx, rx) = futures::channel::mpsc::unbounded();
728        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
729        sink.write(Message::Envelope(Envelope {
730            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
731                root_name: "abcdefg".repeat(10),
732                ..Default::default()
733            })),
734            ..Default::default()
735        }))
736        .await
737        .unwrap();
738        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
739        sink.write(Message::Envelope(Envelope {
740            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
741                root_name: "abcdefg".repeat(1000000),
742                ..Default::default()
743            })),
744            ..Default::default()
745        }))
746        .await
747        .unwrap();
748        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
749
750        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
751        stream.read().await.unwrap();
752        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
753        stream.read().await.unwrap();
754        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
755    }
756
757    #[gpui::test]
758    fn test_converting_peer_id_from_and_to_u64() {
759        let peer_id = PeerId {
760            owner_id: 10,
761            id: 3,
762        };
763        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
764        let peer_id = PeerId {
765            owner_id: u32::MAX,
766            id: 3,
767        };
768        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
769        let peer_id = PeerId {
770            owner_id: 10,
771            id: u32::MAX,
772        };
773        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
774        let peer_id = PeerId {
775            owner_id: u32::MAX,
776            id: u32::MAX,
777        };
778        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
779    }
780}