proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetSupermavenApiKey, Background),
205    (GetSupermavenApiKeyResponse, Background),
206    (GetTypeDefinition, Background),
207    (GetTypeDefinitionResponse, Background),
208    (GetImplementation, Background),
209    (GetImplementationResponse, Background),
210    (GetUsers, Foreground),
211    (Hello, Foreground),
212    (IncomingCall, Foreground),
213    (InlayHints, Background),
214    (InlayHintsResponse, Background),
215    (InviteChannelMember, Foreground),
216    (JoinChannel, Foreground),
217    (JoinChannelBuffer, Foreground),
218    (JoinChannelBufferResponse, Foreground),
219    (JoinChannelChat, Foreground),
220    (JoinChannelChatResponse, Foreground),
221    (JoinProject, Foreground),
222    (JoinHostedProject, Foreground),
223    (JoinProjectResponse, Foreground),
224    (JoinRoom, Foreground),
225    (JoinRoomResponse, Foreground),
226    (LanguageModelResponse, Background),
227    (LeaveChannelBuffer, Background),
228    (LeaveChannelChat, Foreground),
229    (LeaveProject, Foreground),
230    (LeaveRoom, Foreground),
231    (MarkNotificationRead, Foreground),
232    (MoveChannel, Foreground),
233    (OnTypeFormatting, Background),
234    (OnTypeFormattingResponse, Background),
235    (OpenBufferById, Background),
236    (OpenBufferByPath, Background),
237    (OpenBufferForSymbol, Background),
238    (OpenBufferForSymbolResponse, Background),
239    (OpenBufferResponse, Background),
240    (PerformRename, Background),
241    (PerformRenameResponse, Background),
242    (Ping, Foreground),
243    (PrepareRename, Background),
244    (PrepareRenameResponse, Background),
245    (ProjectEntryResponse, Foreground),
246    (RefreshInlayHints, Foreground),
247    (RejoinChannelBuffers, Foreground),
248    (RejoinChannelBuffersResponse, Foreground),
249    (RejoinRoom, Foreground),
250    (RejoinRoomResponse, Foreground),
251    (ReloadBuffers, Foreground),
252    (ReloadBuffersResponse, Foreground),
253    (RemoveChannelMember, Foreground),
254    (RemoveChannelMessage, Foreground),
255    (UpdateChannelMessage, Foreground),
256    (RemoveContact, Foreground),
257    (RemoveProjectCollaborator, Foreground),
258    (RenameChannel, Foreground),
259    (RenameChannelResponse, Foreground),
260    (RenameProjectEntry, Foreground),
261    (RequestContact, Foreground),
262    (ResolveCompletionDocumentation, Background),
263    (ResolveCompletionDocumentationResponse, Background),
264    (ResolveInlayHint, Background),
265    (ResolveInlayHintResponse, Background),
266    (RespondToChannelInvite, Foreground),
267    (RespondToContactRequest, Foreground),
268    (RoomUpdated, Foreground),
269    (SaveBuffer, Foreground),
270    (SetChannelMemberRole, Foreground),
271    (SetChannelVisibility, Foreground),
272    (SearchProject, Background),
273    (SearchProjectResponse, Background),
274    (SendChannelMessage, Background),
275    (SendChannelMessageResponse, Background),
276    (ShareProject, Foreground),
277    (ShareProjectResponse, Foreground),
278    (ShowContacts, Foreground),
279    (StartLanguageServer, Foreground),
280    (SynchronizeBuffers, Foreground),
281    (SynchronizeBuffersResponse, Foreground),
282    (TaskContextForLocation, Background),
283    (TaskContext, Background),
284    (TaskTemplates, Background),
285    (TaskTemplatesResponse, Background),
286    (Test, Foreground),
287    (Unfollow, Foreground),
288    (UnshareProject, Foreground),
289    (UpdateBuffer, Foreground),
290    (UpdateBufferFile, Foreground),
291    (UpdateChannelBuffer, Foreground),
292    (UpdateChannelBufferCollaborators, Foreground),
293    (UpdateChannels, Foreground),
294    (UpdateUserChannels, Foreground),
295    (UpdateContacts, Foreground),
296    (UpdateDiagnosticSummary, Foreground),
297    (UpdateDiffBase, Foreground),
298    (UpdateFollowers, Foreground),
299    (UpdateInviteInfo, Foreground),
300    (UpdateLanguageServer, Foreground),
301    (UpdateParticipantLocation, Foreground),
302    (UpdateProject, Foreground),
303    (UpdateProjectCollaborator, Foreground),
304    (UpdateWorktree, Foreground),
305    (UpdateWorktreeSettings, Foreground),
306    (UsersResponse, Foreground),
307    (LspExtExpandMacro, Background),
308    (LspExtExpandMacroResponse, Background),
309    (SetRoomParticipantRole, Foreground),
310    (BlameBuffer, Foreground),
311    (BlameBufferResponse, Foreground),
312    (CreateDevServerProject, Background),
313    (CreateDevServerProjectResponse, Foreground),
314    (CreateDevServer, Foreground),
315    (CreateDevServerResponse, Foreground),
316    (DevServerInstructions, Foreground),
317    (ShutdownDevServer, Foreground),
318    (ReconnectDevServer, Foreground),
319    (ReconnectDevServerResponse, Foreground),
320    (ShareDevServerProject, Foreground),
321    (JoinDevServerProject, Foreground),
322    (RejoinRemoteProjects, Foreground),
323    (RejoinRemoteProjectsResponse, Foreground),
324    (MultiLspQuery, Background),
325    (MultiLspQueryResponse, Background),
326    (DevServerProjectsUpdate, Foreground),
327    (ValidateDevServerProjectRequest, Background),
328    (DeleteDevServer, Foreground),
329    (DeleteDevServerProject, Foreground),
330    (RegenerateDevServerToken, Foreground),
331    (RegenerateDevServerTokenResponse, Foreground),
332    (RenameDevServer, Foreground),
333    (OpenNewBuffer, Foreground),
334);
335
336request_messages!(
337    (ApplyCodeAction, ApplyCodeActionResponse),
338    (
339        ApplyCompletionAdditionalEdits,
340        ApplyCompletionAdditionalEditsResponse
341    ),
342    (Call, Ack),
343    (CancelCall, Ack),
344    (CopyProjectEntry, ProjectEntryResponse),
345    (CompleteWithLanguageModel, LanguageModelResponse),
346    (ComputeEmbeddings, ComputeEmbeddingsResponse),
347    (CountTokensWithLanguageModel, CountTokensResponse),
348    (CreateChannel, CreateChannelResponse),
349    (CreateProjectEntry, ProjectEntryResponse),
350    (CreateRoom, CreateRoomResponse),
351    (DeclineCall, Ack),
352    (DeleteChannel, Ack),
353    (DeleteProjectEntry, ProjectEntryResponse),
354    (ExpandProjectEntry, ExpandProjectEntryResponse),
355    (Follow, FollowResponse),
356    (FormatBuffers, FormatBuffersResponse),
357    (FuzzySearchUsers, UsersResponse),
358    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
359    (GetChannelMembers, GetChannelMembersResponse),
360    (GetChannelMessages, GetChannelMessagesResponse),
361    (GetChannelMessagesById, GetChannelMessagesResponse),
362    (GetCodeActions, GetCodeActionsResponse),
363    (GetCompletions, GetCompletionsResponse),
364    (GetDefinition, GetDefinitionResponse),
365    (GetImplementation, GetImplementationResponse),
366    (GetDocumentHighlights, GetDocumentHighlightsResponse),
367    (GetHover, GetHoverResponse),
368    (GetNotifications, GetNotificationsResponse),
369    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
370    (GetProjectSymbols, GetProjectSymbolsResponse),
371    (GetReferences, GetReferencesResponse),
372    (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
373    (GetTypeDefinition, GetTypeDefinitionResponse),
374    (GetUsers, UsersResponse),
375    (IncomingCall, Ack),
376    (InlayHints, InlayHintsResponse),
377    (InviteChannelMember, Ack),
378    (JoinChannel, JoinRoomResponse),
379    (JoinChannelBuffer, JoinChannelBufferResponse),
380    (JoinChannelChat, JoinChannelChatResponse),
381    (JoinHostedProject, JoinProjectResponse),
382    (JoinProject, JoinProjectResponse),
383    (JoinRoom, JoinRoomResponse),
384    (LeaveChannelBuffer, Ack),
385    (LeaveRoom, Ack),
386    (MarkNotificationRead, Ack),
387    (MoveChannel, Ack),
388    (OnTypeFormatting, OnTypeFormattingResponse),
389    (OpenBufferById, OpenBufferResponse),
390    (OpenBufferByPath, OpenBufferResponse),
391    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
392    (OpenNewBuffer, OpenBufferResponse),
393    (PerformRename, PerformRenameResponse),
394    (Ping, Ack),
395    (PrepareRename, PrepareRenameResponse),
396    (RefreshInlayHints, Ack),
397    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
398    (RejoinRoom, RejoinRoomResponse),
399    (ReloadBuffers, ReloadBuffersResponse),
400    (RemoveChannelMember, Ack),
401    (RemoveChannelMessage, Ack),
402    (UpdateChannelMessage, Ack),
403    (RemoveContact, Ack),
404    (RenameChannel, RenameChannelResponse),
405    (RenameProjectEntry, ProjectEntryResponse),
406    (RequestContact, Ack),
407    (
408        ResolveCompletionDocumentation,
409        ResolveCompletionDocumentationResponse
410    ),
411    (ResolveInlayHint, ResolveInlayHintResponse),
412    (RespondToChannelInvite, Ack),
413    (RespondToContactRequest, Ack),
414    (SaveBuffer, BufferSaved),
415    (SearchProject, SearchProjectResponse),
416    (SendChannelMessage, SendChannelMessageResponse),
417    (SetChannelMemberRole, Ack),
418    (SetChannelVisibility, Ack),
419    (ShareProject, ShareProjectResponse),
420    (SynchronizeBuffers, SynchronizeBuffersResponse),
421    (TaskContextForLocation, TaskContext),
422    (TaskTemplates, TaskTemplatesResponse),
423    (Test, Test),
424    (UpdateBuffer, Ack),
425    (UpdateParticipantLocation, Ack),
426    (UpdateProject, Ack),
427    (UpdateWorktree, Ack),
428    (LspExtExpandMacro, LspExtExpandMacroResponse),
429    (SetRoomParticipantRole, Ack),
430    (BlameBuffer, BlameBufferResponse),
431    (CreateDevServerProject, CreateDevServerProjectResponse),
432    (CreateDevServer, CreateDevServerResponse),
433    (ShutdownDevServer, Ack),
434    (ShareDevServerProject, ShareProjectResponse),
435    (JoinDevServerProject, JoinProjectResponse),
436    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
437    (ReconnectDevServer, ReconnectDevServerResponse),
438    (ValidateDevServerProjectRequest, Ack),
439    (MultiLspQuery, MultiLspQueryResponse),
440    (DeleteDevServer, Ack),
441    (DeleteDevServerProject, Ack),
442    (RegenerateDevServerToken, RegenerateDevServerTokenResponse),
443    (RenameDevServer, Ack)
444);
445
446entity_messages!(
447    {project_id, ShareProject},
448    AddProjectCollaborator,
449    ApplyCodeAction,
450    ApplyCompletionAdditionalEdits,
451    BlameBuffer,
452    BufferReloaded,
453    BufferSaved,
454    CopyProjectEntry,
455    CreateBufferForPeer,
456    CreateProjectEntry,
457    DeleteProjectEntry,
458    ExpandProjectEntry,
459    FormatBuffers,
460    GetCodeActions,
461    GetCompletions,
462    GetDefinition,
463    GetImplementation,
464    GetDocumentHighlights,
465    GetHover,
466    GetProjectSymbols,
467    GetReferences,
468    GetTypeDefinition,
469    InlayHints,
470    JoinProject,
471    LeaveProject,
472    MultiLspQuery,
473    OnTypeFormatting,
474    OpenNewBuffer,
475    OpenBufferById,
476    OpenBufferByPath,
477    OpenBufferForSymbol,
478    PerformRename,
479    PrepareRename,
480    RefreshInlayHints,
481    ReloadBuffers,
482    RemoveProjectCollaborator,
483    RenameProjectEntry,
484    ResolveCompletionDocumentation,
485    ResolveInlayHint,
486    SaveBuffer,
487    SearchProject,
488    StartLanguageServer,
489    SynchronizeBuffers,
490    TaskContextForLocation,
491    TaskTemplates,
492    UnshareProject,
493    UpdateBuffer,
494    UpdateBufferFile,
495    UpdateDiagnosticSummary,
496    UpdateDiffBase,
497    UpdateLanguageServer,
498    UpdateProject,
499    UpdateProjectCollaborator,
500    UpdateWorktree,
501    UpdateWorktreeSettings,
502    LspExtExpandMacro,
503);
504
505entity_messages!(
506    {channel_id, Channel},
507    ChannelMessageSent,
508    ChannelMessageUpdate,
509    RemoveChannelMessage,
510    UpdateChannelMessage,
511    UpdateChannelBuffer,
512    UpdateChannelBufferCollaborators,
513);
514
515const KIB: usize = 1024;
516const MIB: usize = KIB * 1024;
517const MAX_BUFFER_LEN: usize = MIB;
518
519/// A stream of protobuf messages.
520pub struct MessageStream<S> {
521    stream: S,
522    encoding_buffer: Vec<u8>,
523}
524
525#[allow(clippy::large_enum_variant)]
526#[derive(Debug)]
527pub enum Message {
528    Envelope(Envelope),
529    Ping,
530    Pong,
531}
532
533impl<S> MessageStream<S> {
534    pub fn new(stream: S) -> Self {
535        Self {
536            stream,
537            encoding_buffer: Vec::new(),
538        }
539    }
540
541    pub fn inner_mut(&mut self) -> &mut S {
542        &mut self.stream
543    }
544}
545
546impl<S> MessageStream<S>
547where
548    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
549{
550    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
551        #[cfg(any(test, feature = "test-support"))]
552        const COMPRESSION_LEVEL: i32 = -7;
553
554        #[cfg(not(any(test, feature = "test-support")))]
555        const COMPRESSION_LEVEL: i32 = 4;
556
557        match message {
558            Message::Envelope(message) => {
559                self.encoding_buffer.reserve(message.encoded_len());
560                message
561                    .encode(&mut self.encoding_buffer)
562                    .map_err(io::Error::from)?;
563                let buffer =
564                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
565                        .unwrap();
566
567                self.encoding_buffer.clear();
568                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
569                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
570            }
571            Message::Ping => {
572                self.stream
573                    .send(WebSocketMessage::Ping(Default::default()))
574                    .await?;
575            }
576            Message::Pong => {
577                self.stream
578                    .send(WebSocketMessage::Pong(Default::default()))
579                    .await?;
580            }
581        }
582
583        Ok(())
584    }
585}
586
587impl<S> MessageStream<S>
588where
589    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
590{
591    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
592        while let Some(bytes) = self.stream.next().await {
593            let received_at = Instant::now();
594            match bytes? {
595                WebSocketMessage::Binary(bytes) => {
596                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
597                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
598                        .map_err(io::Error::from)?;
599
600                    self.encoding_buffer.clear();
601                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
602                    return Ok((Message::Envelope(envelope), received_at));
603                }
604                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
605                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
606                WebSocketMessage::Close(_) => break,
607                _ => {}
608            }
609        }
610        Err(anyhow!("connection closed"))
611    }
612}
613
614impl From<Timestamp> for SystemTime {
615    fn from(val: Timestamp) -> Self {
616        UNIX_EPOCH
617            .checked_add(Duration::new(val.seconds, val.nanos))
618            .unwrap()
619    }
620}
621
622impl From<SystemTime> for Timestamp {
623    fn from(time: SystemTime) -> Self {
624        let duration = time.duration_since(UNIX_EPOCH).unwrap();
625        Self {
626            seconds: duration.as_secs(),
627            nanos: duration.subsec_nanos(),
628        }
629    }
630}
631
632impl From<u128> for Nonce {
633    fn from(nonce: u128) -> Self {
634        let upper_half = (nonce >> 64) as u64;
635        let lower_half = nonce as u64;
636        Self {
637            upper_half,
638            lower_half,
639        }
640    }
641}
642
643impl From<Nonce> for u128 {
644    fn from(nonce: Nonce) -> Self {
645        let upper_half = (nonce.upper_half as u128) << 64;
646        let lower_half = nonce.lower_half as u128;
647        upper_half | lower_half
648    }
649}
650
651pub fn split_worktree_update(
652    mut message: UpdateWorktree,
653    max_chunk_size: usize,
654) -> impl Iterator<Item = UpdateWorktree> {
655    let mut done_files = false;
656
657    let mut repository_map = message
658        .updated_repositories
659        .into_iter()
660        .map(|repo| (repo.work_directory_id, repo))
661        .collect::<HashMap<_, _>>();
662
663    iter::from_fn(move || {
664        if done_files {
665            return None;
666        }
667
668        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
669        let updated_entries: Vec<_> = message
670            .updated_entries
671            .drain(..updated_entries_chunk_size)
672            .collect();
673
674        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
675        let removed_entries = message
676            .removed_entries
677            .drain(..removed_entries_chunk_size)
678            .collect();
679
680        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
681
682        let mut updated_repositories = Vec::new();
683
684        if !repository_map.is_empty() {
685            for entry in &updated_entries {
686                if let Some(repo) = repository_map.remove(&entry.id) {
687                    updated_repositories.push(repo)
688                }
689            }
690        }
691
692        let removed_repositories = if done_files {
693            mem::take(&mut message.removed_repositories)
694        } else {
695            Default::default()
696        };
697
698        if done_files {
699            updated_repositories.extend(mem::take(&mut repository_map).into_values());
700        }
701
702        Some(UpdateWorktree {
703            project_id: message.project_id,
704            worktree_id: message.worktree_id,
705            root_name: message.root_name.clone(),
706            abs_path: message.abs_path.clone(),
707            updated_entries,
708            removed_entries,
709            scan_id: message.scan_id,
710            is_last_update: done_files && message.is_last_update,
711            updated_repositories,
712            removed_repositories,
713        })
714    })
715}
716
717#[cfg(test)]
718mod tests {
719    use super::*;
720
721    #[gpui::test]
722    async fn test_buffer_size() {
723        let (tx, rx) = futures::channel::mpsc::unbounded();
724        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
725        sink.write(Message::Envelope(Envelope {
726            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
727                root_name: "abcdefg".repeat(10),
728                ..Default::default()
729            })),
730            ..Default::default()
731        }))
732        .await
733        .unwrap();
734        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
735        sink.write(Message::Envelope(Envelope {
736            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
737                root_name: "abcdefg".repeat(1000000),
738                ..Default::default()
739            })),
740            ..Default::default()
741        }))
742        .await
743        .unwrap();
744        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
745
746        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
747        stream.read().await.unwrap();
748        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
749        stream.read().await.unwrap();
750        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
751    }
752
753    #[gpui::test]
754    fn test_converting_peer_id_from_and_to_u64() {
755        let peer_id = PeerId {
756            owner_id: 10,
757            id: 3,
758        };
759        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
760        let peer_id = PeerId {
761            owner_id: u32::MAX,
762            id: 3,
763        };
764        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
765        let peer_id = PeerId {
766            owner_id: 10,
767            id: u32::MAX,
768        };
769        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
770        let peer_id = PeerId {
771            owner_id: u32::MAX,
772            id: u32::MAX,
773        };
774        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
775    }
776}