proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetSupermavenApiKey, Background),
205    (GetSupermavenApiKeyResponse, Background),
206    (GetTypeDefinition, Background),
207    (GetTypeDefinitionResponse, Background),
208    (GetImplementation, Background),
209    (GetImplementationResponse, Background),
210    (GetUsers, Foreground),
211    (Hello, Foreground),
212    (IncomingCall, Foreground),
213    (InlayHints, Background),
214    (InlayHintsResponse, Background),
215    (InviteChannelMember, Foreground),
216    (JoinChannel, Foreground),
217    (JoinChannelBuffer, Foreground),
218    (JoinChannelBufferResponse, Foreground),
219    (JoinChannelChat, Foreground),
220    (JoinChannelChatResponse, Foreground),
221    (JoinProject, Foreground),
222    (JoinHostedProject, Foreground),
223    (JoinProjectResponse, Foreground),
224    (JoinRoom, Foreground),
225    (JoinRoomResponse, Foreground),
226    (LanguageModelResponse, Background),
227    (LeaveChannelBuffer, Background),
228    (LeaveChannelChat, Foreground),
229    (LeaveProject, Foreground),
230    (LeaveRoom, Foreground),
231    (MarkNotificationRead, Foreground),
232    (MoveChannel, Foreground),
233    (OnTypeFormatting, Background),
234    (OnTypeFormattingResponse, Background),
235    (OpenBufferById, Background),
236    (OpenBufferByPath, Background),
237    (OpenBufferForSymbol, Background),
238    (OpenBufferForSymbolResponse, Background),
239    (OpenBufferResponse, Background),
240    (PerformRename, Background),
241    (PerformRenameResponse, Background),
242    (Ping, Foreground),
243    (PrepareRename, Background),
244    (PrepareRenameResponse, Background),
245    (ProjectEntryResponse, Foreground),
246    (RefreshInlayHints, Foreground),
247    (RejoinChannelBuffers, Foreground),
248    (RejoinChannelBuffersResponse, Foreground),
249    (RejoinRoom, Foreground),
250    (RejoinRoomResponse, Foreground),
251    (ReloadBuffers, Foreground),
252    (ReloadBuffersResponse, Foreground),
253    (RemoveChannelMember, Foreground),
254    (RemoveChannelMessage, Foreground),
255    (UpdateChannelMessage, Foreground),
256    (RemoveContact, Foreground),
257    (RemoveProjectCollaborator, Foreground),
258    (RenameChannel, Foreground),
259    (RenameChannelResponse, Foreground),
260    (RenameProjectEntry, Foreground),
261    (RequestContact, Foreground),
262    (ResolveCompletionDocumentation, Background),
263    (ResolveCompletionDocumentationResponse, Background),
264    (ResolveInlayHint, Background),
265    (ResolveInlayHintResponse, Background),
266    (RespondToChannelInvite, Foreground),
267    (RespondToContactRequest, Foreground),
268    (RoomUpdated, Foreground),
269    (SaveBuffer, Foreground),
270    (SetChannelMemberRole, Foreground),
271    (SetChannelVisibility, Foreground),
272    (SearchProject, Background),
273    (SearchProjectResponse, Background),
274    (SendChannelMessage, Background),
275    (SendChannelMessageResponse, Background),
276    (ShareProject, Foreground),
277    (ShareProjectResponse, Foreground),
278    (ShowContacts, Foreground),
279    (StartLanguageServer, Foreground),
280    (SubscribeToChannels, Foreground),
281    (SynchronizeBuffers, Foreground),
282    (SynchronizeBuffersResponse, Foreground),
283    (TaskContextForLocation, Background),
284    (TaskContext, Background),
285    (TaskTemplates, Background),
286    (TaskTemplatesResponse, Background),
287    (Test, Foreground),
288    (Unfollow, Foreground),
289    (UnshareProject, Foreground),
290    (UpdateBuffer, Foreground),
291    (UpdateBufferFile, Foreground),
292    (UpdateChannelBuffer, Foreground),
293    (UpdateChannelBufferCollaborators, Foreground),
294    (UpdateChannels, Foreground),
295    (UpdateUserChannels, Foreground),
296    (UpdateContacts, Foreground),
297    (UpdateDiagnosticSummary, Foreground),
298    (UpdateDiffBase, Foreground),
299    (UpdateFollowers, Foreground),
300    (UpdateInviteInfo, Foreground),
301    (UpdateLanguageServer, Foreground),
302    (UpdateParticipantLocation, Foreground),
303    (UpdateProject, Foreground),
304    (UpdateProjectCollaborator, Foreground),
305    (UpdateWorktree, Foreground),
306    (UpdateWorktreeSettings, Foreground),
307    (UsersResponse, Foreground),
308    (LspExtExpandMacro, Background),
309    (LspExtExpandMacroResponse, Background),
310    (SetRoomParticipantRole, Foreground),
311    (BlameBuffer, Foreground),
312    (BlameBufferResponse, Foreground),
313    (CreateDevServerProject, Background),
314    (CreateDevServerProjectResponse, Foreground),
315    (CreateDevServer, Foreground),
316    (CreateDevServerResponse, Foreground),
317    (DevServerInstructions, Foreground),
318    (ShutdownDevServer, Foreground),
319    (ReconnectDevServer, Foreground),
320    (ReconnectDevServerResponse, Foreground),
321    (ShareDevServerProject, Foreground),
322    (JoinDevServerProject, Foreground),
323    (RejoinRemoteProjects, Foreground),
324    (RejoinRemoteProjectsResponse, Foreground),
325    (MultiLspQuery, Background),
326    (MultiLspQueryResponse, Background),
327    (DevServerProjectsUpdate, Foreground),
328    (ValidateDevServerProjectRequest, Background),
329    (DeleteDevServer, Foreground),
330    (DeleteDevServerProject, Foreground),
331    (RegenerateDevServerToken, Foreground),
332    (RegenerateDevServerTokenResponse, Foreground),
333    (RenameDevServer, Foreground),
334    (OpenNewBuffer, Foreground),
335);
336
337request_messages!(
338    (ApplyCodeAction, ApplyCodeActionResponse),
339    (
340        ApplyCompletionAdditionalEdits,
341        ApplyCompletionAdditionalEditsResponse
342    ),
343    (Call, Ack),
344    (CancelCall, Ack),
345    (CopyProjectEntry, ProjectEntryResponse),
346    (CompleteWithLanguageModel, LanguageModelResponse),
347    (ComputeEmbeddings, ComputeEmbeddingsResponse),
348    (CountTokensWithLanguageModel, CountTokensResponse),
349    (CreateChannel, CreateChannelResponse),
350    (CreateProjectEntry, ProjectEntryResponse),
351    (CreateRoom, CreateRoomResponse),
352    (DeclineCall, Ack),
353    (DeleteChannel, Ack),
354    (DeleteProjectEntry, ProjectEntryResponse),
355    (ExpandProjectEntry, ExpandProjectEntryResponse),
356    (Follow, FollowResponse),
357    (FormatBuffers, FormatBuffersResponse),
358    (FuzzySearchUsers, UsersResponse),
359    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
360    (GetChannelMembers, GetChannelMembersResponse),
361    (GetChannelMessages, GetChannelMessagesResponse),
362    (GetChannelMessagesById, GetChannelMessagesResponse),
363    (GetCodeActions, GetCodeActionsResponse),
364    (GetCompletions, GetCompletionsResponse),
365    (GetDefinition, GetDefinitionResponse),
366    (GetImplementation, GetImplementationResponse),
367    (GetDocumentHighlights, GetDocumentHighlightsResponse),
368    (GetHover, GetHoverResponse),
369    (GetNotifications, GetNotificationsResponse),
370    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
371    (GetProjectSymbols, GetProjectSymbolsResponse),
372    (GetReferences, GetReferencesResponse),
373    (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
374    (GetTypeDefinition, GetTypeDefinitionResponse),
375    (GetUsers, UsersResponse),
376    (IncomingCall, Ack),
377    (InlayHints, InlayHintsResponse),
378    (InviteChannelMember, Ack),
379    (JoinChannel, JoinRoomResponse),
380    (JoinChannelBuffer, JoinChannelBufferResponse),
381    (JoinChannelChat, JoinChannelChatResponse),
382    (JoinHostedProject, JoinProjectResponse),
383    (JoinProject, JoinProjectResponse),
384    (JoinRoom, JoinRoomResponse),
385    (LeaveChannelBuffer, Ack),
386    (LeaveRoom, Ack),
387    (MarkNotificationRead, Ack),
388    (MoveChannel, Ack),
389    (OnTypeFormatting, OnTypeFormattingResponse),
390    (OpenBufferById, OpenBufferResponse),
391    (OpenBufferByPath, OpenBufferResponse),
392    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
393    (OpenNewBuffer, OpenBufferResponse),
394    (PerformRename, PerformRenameResponse),
395    (Ping, Ack),
396    (PrepareRename, PrepareRenameResponse),
397    (RefreshInlayHints, Ack),
398    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
399    (RejoinRoom, RejoinRoomResponse),
400    (ReloadBuffers, ReloadBuffersResponse),
401    (RemoveChannelMember, Ack),
402    (RemoveChannelMessage, Ack),
403    (UpdateChannelMessage, Ack),
404    (RemoveContact, Ack),
405    (RenameChannel, RenameChannelResponse),
406    (RenameProjectEntry, ProjectEntryResponse),
407    (RequestContact, Ack),
408    (
409        ResolveCompletionDocumentation,
410        ResolveCompletionDocumentationResponse
411    ),
412    (ResolveInlayHint, ResolveInlayHintResponse),
413    (RespondToChannelInvite, Ack),
414    (RespondToContactRequest, Ack),
415    (SaveBuffer, BufferSaved),
416    (SearchProject, SearchProjectResponse),
417    (SendChannelMessage, SendChannelMessageResponse),
418    (SetChannelMemberRole, Ack),
419    (SetChannelVisibility, Ack),
420    (ShareProject, ShareProjectResponse),
421    (SynchronizeBuffers, SynchronizeBuffersResponse),
422    (TaskContextForLocation, TaskContext),
423    (TaskTemplates, TaskTemplatesResponse),
424    (Test, Test),
425    (UpdateBuffer, Ack),
426    (UpdateParticipantLocation, Ack),
427    (UpdateProject, Ack),
428    (UpdateWorktree, Ack),
429    (LspExtExpandMacro, LspExtExpandMacroResponse),
430    (SetRoomParticipantRole, Ack),
431    (BlameBuffer, BlameBufferResponse),
432    (CreateDevServerProject, CreateDevServerProjectResponse),
433    (CreateDevServer, CreateDevServerResponse),
434    (ShutdownDevServer, Ack),
435    (ShareDevServerProject, ShareProjectResponse),
436    (JoinDevServerProject, JoinProjectResponse),
437    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
438    (ReconnectDevServer, ReconnectDevServerResponse),
439    (ValidateDevServerProjectRequest, Ack),
440    (MultiLspQuery, MultiLspQueryResponse),
441    (DeleteDevServer, Ack),
442    (DeleteDevServerProject, Ack),
443    (RegenerateDevServerToken, RegenerateDevServerTokenResponse),
444    (RenameDevServer, Ack)
445);
446
447entity_messages!(
448    {project_id, ShareProject},
449    AddProjectCollaborator,
450    ApplyCodeAction,
451    ApplyCompletionAdditionalEdits,
452    BlameBuffer,
453    BufferReloaded,
454    BufferSaved,
455    CopyProjectEntry,
456    CreateBufferForPeer,
457    CreateProjectEntry,
458    DeleteProjectEntry,
459    ExpandProjectEntry,
460    FormatBuffers,
461    GetCodeActions,
462    GetCompletions,
463    GetDefinition,
464    GetImplementation,
465    GetDocumentHighlights,
466    GetHover,
467    GetProjectSymbols,
468    GetReferences,
469    GetTypeDefinition,
470    InlayHints,
471    JoinProject,
472    LeaveProject,
473    MultiLspQuery,
474    OnTypeFormatting,
475    OpenNewBuffer,
476    OpenBufferById,
477    OpenBufferByPath,
478    OpenBufferForSymbol,
479    PerformRename,
480    PrepareRename,
481    RefreshInlayHints,
482    ReloadBuffers,
483    RemoveProjectCollaborator,
484    RenameProjectEntry,
485    ResolveCompletionDocumentation,
486    ResolveInlayHint,
487    SaveBuffer,
488    SearchProject,
489    StartLanguageServer,
490    SynchronizeBuffers,
491    TaskContextForLocation,
492    TaskTemplates,
493    UnshareProject,
494    UpdateBuffer,
495    UpdateBufferFile,
496    UpdateDiagnosticSummary,
497    UpdateDiffBase,
498    UpdateLanguageServer,
499    UpdateProject,
500    UpdateProjectCollaborator,
501    UpdateWorktree,
502    UpdateWorktreeSettings,
503    LspExtExpandMacro,
504);
505
506entity_messages!(
507    {channel_id, Channel},
508    ChannelMessageSent,
509    ChannelMessageUpdate,
510    RemoveChannelMessage,
511    UpdateChannelMessage,
512    UpdateChannelBuffer,
513    UpdateChannelBufferCollaborators,
514);
515
516const KIB: usize = 1024;
517const MIB: usize = KIB * 1024;
518const MAX_BUFFER_LEN: usize = MIB;
519
520/// A stream of protobuf messages.
521pub struct MessageStream<S> {
522    stream: S,
523    encoding_buffer: Vec<u8>,
524}
525
526#[allow(clippy::large_enum_variant)]
527#[derive(Debug)]
528pub enum Message {
529    Envelope(Envelope),
530    Ping,
531    Pong,
532}
533
534impl<S> MessageStream<S> {
535    pub fn new(stream: S) -> Self {
536        Self {
537            stream,
538            encoding_buffer: Vec::new(),
539        }
540    }
541
542    pub fn inner_mut(&mut self) -> &mut S {
543        &mut self.stream
544    }
545}
546
547impl<S> MessageStream<S>
548where
549    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
550{
551    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
552        #[cfg(any(test, feature = "test-support"))]
553        const COMPRESSION_LEVEL: i32 = -7;
554
555        #[cfg(not(any(test, feature = "test-support")))]
556        const COMPRESSION_LEVEL: i32 = 4;
557
558        match message {
559            Message::Envelope(message) => {
560                self.encoding_buffer.reserve(message.encoded_len());
561                message
562                    .encode(&mut self.encoding_buffer)
563                    .map_err(io::Error::from)?;
564                let buffer =
565                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
566                        .unwrap();
567
568                self.encoding_buffer.clear();
569                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
570                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
571            }
572            Message::Ping => {
573                self.stream
574                    .send(WebSocketMessage::Ping(Default::default()))
575                    .await?;
576            }
577            Message::Pong => {
578                self.stream
579                    .send(WebSocketMessage::Pong(Default::default()))
580                    .await?;
581            }
582        }
583
584        Ok(())
585    }
586}
587
588impl<S> MessageStream<S>
589where
590    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
591{
592    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
593        while let Some(bytes) = self.stream.next().await {
594            let received_at = Instant::now();
595            match bytes? {
596                WebSocketMessage::Binary(bytes) => {
597                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
598                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
599                        .map_err(io::Error::from)?;
600
601                    self.encoding_buffer.clear();
602                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
603                    return Ok((Message::Envelope(envelope), received_at));
604                }
605                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
606                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
607                WebSocketMessage::Close(_) => break,
608                _ => {}
609            }
610        }
611        Err(anyhow!("connection closed"))
612    }
613}
614
615impl From<Timestamp> for SystemTime {
616    fn from(val: Timestamp) -> Self {
617        UNIX_EPOCH
618            .checked_add(Duration::new(val.seconds, val.nanos))
619            .unwrap()
620    }
621}
622
623impl From<SystemTime> for Timestamp {
624    fn from(time: SystemTime) -> Self {
625        let duration = time.duration_since(UNIX_EPOCH).unwrap();
626        Self {
627            seconds: duration.as_secs(),
628            nanos: duration.subsec_nanos(),
629        }
630    }
631}
632
633impl From<u128> for Nonce {
634    fn from(nonce: u128) -> Self {
635        let upper_half = (nonce >> 64) as u64;
636        let lower_half = nonce as u64;
637        Self {
638            upper_half,
639            lower_half,
640        }
641    }
642}
643
644impl From<Nonce> for u128 {
645    fn from(nonce: Nonce) -> Self {
646        let upper_half = (nonce.upper_half as u128) << 64;
647        let lower_half = nonce.lower_half as u128;
648        upper_half | lower_half
649    }
650}
651
652pub fn split_worktree_update(
653    mut message: UpdateWorktree,
654    max_chunk_size: usize,
655) -> impl Iterator<Item = UpdateWorktree> {
656    let mut done_files = false;
657
658    let mut repository_map = message
659        .updated_repositories
660        .into_iter()
661        .map(|repo| (repo.work_directory_id, repo))
662        .collect::<HashMap<_, _>>();
663
664    iter::from_fn(move || {
665        if done_files {
666            return None;
667        }
668
669        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
670        let updated_entries: Vec<_> = message
671            .updated_entries
672            .drain(..updated_entries_chunk_size)
673            .collect();
674
675        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
676        let removed_entries = message
677            .removed_entries
678            .drain(..removed_entries_chunk_size)
679            .collect();
680
681        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
682
683        let mut updated_repositories = Vec::new();
684
685        if !repository_map.is_empty() {
686            for entry in &updated_entries {
687                if let Some(repo) = repository_map.remove(&entry.id) {
688                    updated_repositories.push(repo)
689                }
690            }
691        }
692
693        let removed_repositories = if done_files {
694            mem::take(&mut message.removed_repositories)
695        } else {
696            Default::default()
697        };
698
699        if done_files {
700            updated_repositories.extend(mem::take(&mut repository_map).into_values());
701        }
702
703        Some(UpdateWorktree {
704            project_id: message.project_id,
705            worktree_id: message.worktree_id,
706            root_name: message.root_name.clone(),
707            abs_path: message.abs_path.clone(),
708            updated_entries,
709            removed_entries,
710            scan_id: message.scan_id,
711            is_last_update: done_files && message.is_last_update,
712            updated_repositories,
713            removed_repositories,
714        })
715    })
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721
722    #[gpui::test]
723    async fn test_buffer_size() {
724        let (tx, rx) = futures::channel::mpsc::unbounded();
725        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
726        sink.write(Message::Envelope(Envelope {
727            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
728                root_name: "abcdefg".repeat(10),
729                ..Default::default()
730            })),
731            ..Default::default()
732        }))
733        .await
734        .unwrap();
735        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
736        sink.write(Message::Envelope(Envelope {
737            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
738                root_name: "abcdefg".repeat(1000000),
739                ..Default::default()
740            })),
741            ..Default::default()
742        }))
743        .await
744        .unwrap();
745        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
746
747        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
748        stream.read().await.unwrap();
749        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
750        stream.read().await.unwrap();
751        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
752    }
753
754    #[gpui::test]
755    fn test_converting_peer_id_from_and_to_u64() {
756        let peer_id = PeerId {
757            owner_id: 10,
758            id: 3,
759        };
760        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
761        let peer_id = PeerId {
762            owner_id: u32::MAX,
763            id: 3,
764        };
765        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
766        let peer_id = PeerId {
767            owner_id: 10,
768            id: u32::MAX,
769        };
770        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
771        let peer_id = PeerId {
772            owner_id: u32::MAX,
773            id: u32::MAX,
774        };
775        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
776    }
777}