proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetTypeDefinition, Background),
205    (GetTypeDefinitionResponse, Background),
206    (GetImplementation, Background),
207    (GetImplementationResponse, Background),
208    (GetUsers, Foreground),
209    (Hello, Foreground),
210    (IncomingCall, Foreground),
211    (InlayHints, Background),
212    (InlayHintsResponse, Background),
213    (InviteChannelMember, Foreground),
214    (JoinChannel, Foreground),
215    (JoinChannelBuffer, Foreground),
216    (JoinChannelBufferResponse, Foreground),
217    (JoinChannelChat, Foreground),
218    (JoinChannelChatResponse, Foreground),
219    (JoinProject, Foreground),
220    (JoinHostedProject, Foreground),
221    (JoinProjectResponse, Foreground),
222    (JoinRoom, Foreground),
223    (JoinRoomResponse, Foreground),
224    (LanguageModelResponse, Background),
225    (LeaveChannelBuffer, Background),
226    (LeaveChannelChat, Foreground),
227    (LeaveProject, Foreground),
228    (LeaveRoom, Foreground),
229    (MarkNotificationRead, Foreground),
230    (MoveChannel, Foreground),
231    (OnTypeFormatting, Background),
232    (OnTypeFormattingResponse, Background),
233    (OpenBufferById, Background),
234    (OpenBufferByPath, Background),
235    (OpenBufferForSymbol, Background),
236    (OpenBufferForSymbolResponse, Background),
237    (OpenBufferResponse, Background),
238    (PerformRename, Background),
239    (PerformRenameResponse, Background),
240    (Ping, Foreground),
241    (PrepareRename, Background),
242    (PrepareRenameResponse, Background),
243    (ProjectEntryResponse, Foreground),
244    (RefreshInlayHints, Foreground),
245    (RejoinChannelBuffers, Foreground),
246    (RejoinChannelBuffersResponse, Foreground),
247    (RejoinRoom, Foreground),
248    (RejoinRoomResponse, Foreground),
249    (ReloadBuffers, Foreground),
250    (ReloadBuffersResponse, Foreground),
251    (RemoveChannelMember, Foreground),
252    (RemoveChannelMessage, Foreground),
253    (UpdateChannelMessage, Foreground),
254    (RemoveContact, Foreground),
255    (RemoveProjectCollaborator, Foreground),
256    (RenameChannel, Foreground),
257    (RenameChannelResponse, Foreground),
258    (RenameProjectEntry, Foreground),
259    (RequestContact, Foreground),
260    (ResolveCompletionDocumentation, Background),
261    (ResolveCompletionDocumentationResponse, Background),
262    (ResolveInlayHint, Background),
263    (ResolveInlayHintResponse, Background),
264    (RespondToChannelInvite, Foreground),
265    (RespondToContactRequest, Foreground),
266    (RoomUpdated, Foreground),
267    (SaveBuffer, Foreground),
268    (SetChannelMemberRole, Foreground),
269    (SetChannelVisibility, Foreground),
270    (SearchProject, Background),
271    (SearchProjectResponse, Background),
272    (SendChannelMessage, Background),
273    (SendChannelMessageResponse, Background),
274    (ShareProject, Foreground),
275    (ShareProjectResponse, Foreground),
276    (ShowContacts, Foreground),
277    (StartLanguageServer, Foreground),
278    (SynchronizeBuffers, Foreground),
279    (SynchronizeBuffersResponse, Foreground),
280    (Test, Foreground),
281    (Unfollow, Foreground),
282    (UnshareProject, Foreground),
283    (UpdateBuffer, Foreground),
284    (UpdateBufferFile, Foreground),
285    (UpdateChannelBuffer, Foreground),
286    (UpdateChannelBufferCollaborators, Foreground),
287    (UpdateChannels, Foreground),
288    (UpdateUserChannels, Foreground),
289    (UpdateContacts, Foreground),
290    (UpdateDiagnosticSummary, Foreground),
291    (UpdateDiffBase, Foreground),
292    (UpdateFollowers, Foreground),
293    (UpdateInviteInfo, Foreground),
294    (UpdateLanguageServer, Foreground),
295    (UpdateParticipantLocation, Foreground),
296    (UpdateProject, Foreground),
297    (UpdateProjectCollaborator, Foreground),
298    (UpdateWorktree, Foreground),
299    (UpdateWorktreeSettings, Foreground),
300    (UsersResponse, Foreground),
301    (LspExtExpandMacro, Background),
302    (LspExtExpandMacroResponse, Background),
303    (SetRoomParticipantRole, Foreground),
304    (BlameBuffer, Foreground),
305    (BlameBufferResponse, Foreground),
306    (CreateDevServerProject, Background),
307    (CreateDevServerProjectResponse, Foreground),
308    (CreateDevServer, Foreground),
309    (CreateDevServerResponse, Foreground),
310    (DevServerInstructions, Foreground),
311    (ShutdownDevServer, Foreground),
312    (ReconnectDevServer, Foreground),
313    (ReconnectDevServerResponse, Foreground),
314    (ShareDevServerProject, Foreground),
315    (JoinDevServerProject, Foreground),
316    (RejoinRemoteProjects, Foreground),
317    (RejoinRemoteProjectsResponse, Foreground),
318    (MultiLspQuery, Background),
319    (MultiLspQueryResponse, Background),
320    (DevServerProjectsUpdate, Foreground),
321    (ValidateDevServerProjectRequest, Background),
322    (DeleteDevServer, Foreground),
323    (DeleteDevServerProject, Foreground),
324    (OpenNewBuffer, Foreground)
325);
326
327request_messages!(
328    (ApplyCodeAction, ApplyCodeActionResponse),
329    (
330        ApplyCompletionAdditionalEdits,
331        ApplyCompletionAdditionalEditsResponse
332    ),
333    (Call, Ack),
334    (CancelCall, Ack),
335    (CopyProjectEntry, ProjectEntryResponse),
336    (CompleteWithLanguageModel, LanguageModelResponse),
337    (ComputeEmbeddings, ComputeEmbeddingsResponse),
338    (CountTokensWithLanguageModel, CountTokensResponse),
339    (CreateChannel, CreateChannelResponse),
340    (CreateProjectEntry, ProjectEntryResponse),
341    (CreateRoom, CreateRoomResponse),
342    (DeclineCall, Ack),
343    (DeleteChannel, Ack),
344    (DeleteProjectEntry, ProjectEntryResponse),
345    (ExpandProjectEntry, ExpandProjectEntryResponse),
346    (Follow, FollowResponse),
347    (FormatBuffers, FormatBuffersResponse),
348    (FuzzySearchUsers, UsersResponse),
349    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
350    (GetChannelMembers, GetChannelMembersResponse),
351    (GetChannelMessages, GetChannelMessagesResponse),
352    (GetChannelMessagesById, GetChannelMessagesResponse),
353    (GetCodeActions, GetCodeActionsResponse),
354    (GetCompletions, GetCompletionsResponse),
355    (GetDefinition, GetDefinitionResponse),
356    (GetImplementation, GetImplementationResponse),
357    (GetDocumentHighlights, GetDocumentHighlightsResponse),
358    (GetHover, GetHoverResponse),
359    (GetNotifications, GetNotificationsResponse),
360    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
361    (GetProjectSymbols, GetProjectSymbolsResponse),
362    (GetReferences, GetReferencesResponse),
363    (GetTypeDefinition, GetTypeDefinitionResponse),
364    (GetUsers, UsersResponse),
365    (IncomingCall, Ack),
366    (InlayHints, InlayHintsResponse),
367    (InviteChannelMember, Ack),
368    (JoinChannel, JoinRoomResponse),
369    (JoinChannelBuffer, JoinChannelBufferResponse),
370    (JoinChannelChat, JoinChannelChatResponse),
371    (JoinHostedProject, JoinProjectResponse),
372    (JoinProject, JoinProjectResponse),
373    (JoinRoom, JoinRoomResponse),
374    (LeaveChannelBuffer, Ack),
375    (LeaveRoom, Ack),
376    (MarkNotificationRead, Ack),
377    (MoveChannel, Ack),
378    (OnTypeFormatting, OnTypeFormattingResponse),
379    (OpenBufferById, OpenBufferResponse),
380    (OpenBufferByPath, OpenBufferResponse),
381    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
382    (OpenNewBuffer, OpenBufferResponse),
383    (PerformRename, PerformRenameResponse),
384    (Ping, Ack),
385    (PrepareRename, PrepareRenameResponse),
386    (RefreshInlayHints, Ack),
387    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
388    (RejoinRoom, RejoinRoomResponse),
389    (ReloadBuffers, ReloadBuffersResponse),
390    (RemoveChannelMember, Ack),
391    (RemoveChannelMessage, Ack),
392    (UpdateChannelMessage, Ack),
393    (RemoveContact, Ack),
394    (RenameChannel, RenameChannelResponse),
395    (RenameProjectEntry, ProjectEntryResponse),
396    (RequestContact, Ack),
397    (
398        ResolveCompletionDocumentation,
399        ResolveCompletionDocumentationResponse
400    ),
401    (ResolveInlayHint, ResolveInlayHintResponse),
402    (RespondToChannelInvite, Ack),
403    (RespondToContactRequest, Ack),
404    (SaveBuffer, BufferSaved),
405    (SearchProject, SearchProjectResponse),
406    (SendChannelMessage, SendChannelMessageResponse),
407    (SetChannelMemberRole, Ack),
408    (SetChannelVisibility, Ack),
409    (ShareProject, ShareProjectResponse),
410    (SynchronizeBuffers, SynchronizeBuffersResponse),
411    (Test, Test),
412    (UpdateBuffer, Ack),
413    (UpdateParticipantLocation, Ack),
414    (UpdateProject, Ack),
415    (UpdateWorktree, Ack),
416    (LspExtExpandMacro, LspExtExpandMacroResponse),
417    (SetRoomParticipantRole, Ack),
418    (BlameBuffer, BlameBufferResponse),
419    (CreateDevServerProject, CreateDevServerProjectResponse),
420    (CreateDevServer, CreateDevServerResponse),
421    (ShutdownDevServer, Ack),
422    (ShareDevServerProject, ShareProjectResponse),
423    (JoinDevServerProject, JoinProjectResponse),
424    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
425    (ReconnectDevServer, ReconnectDevServerResponse),
426    (ValidateDevServerProjectRequest, Ack),
427    (MultiLspQuery, MultiLspQueryResponse),
428    (DeleteDevServer, Ack),
429    (DeleteDevServerProject, Ack),
430);
431
432entity_messages!(
433    {project_id, ShareProject},
434    AddProjectCollaborator,
435    ApplyCodeAction,
436    ApplyCompletionAdditionalEdits,
437    BlameBuffer,
438    BufferReloaded,
439    BufferSaved,
440    CopyProjectEntry,
441    CreateBufferForPeer,
442    CreateProjectEntry,
443    DeleteProjectEntry,
444    ExpandProjectEntry,
445    FormatBuffers,
446    GetCodeActions,
447    GetCompletions,
448    GetDefinition,
449    GetImplementation,
450    GetDocumentHighlights,
451    GetHover,
452    GetProjectSymbols,
453    GetReferences,
454    GetTypeDefinition,
455    InlayHints,
456    JoinProject,
457    LeaveProject,
458    MultiLspQuery,
459    OnTypeFormatting,
460    OpenNewBuffer,
461    OpenBufferById,
462    OpenBufferByPath,
463    OpenBufferForSymbol,
464    PerformRename,
465    PrepareRename,
466    RefreshInlayHints,
467    ReloadBuffers,
468    RemoveProjectCollaborator,
469    RenameProjectEntry,
470    ResolveCompletionDocumentation,
471    ResolveInlayHint,
472    SaveBuffer,
473    SearchProject,
474    StartLanguageServer,
475    SynchronizeBuffers,
476    UnshareProject,
477    UpdateBuffer,
478    UpdateBufferFile,
479    UpdateDiagnosticSummary,
480    UpdateDiffBase,
481    UpdateLanguageServer,
482    UpdateProject,
483    UpdateProjectCollaborator,
484    UpdateWorktree,
485    UpdateWorktreeSettings,
486    LspExtExpandMacro,
487);
488
489entity_messages!(
490    {channel_id, Channel},
491    ChannelMessageSent,
492    ChannelMessageUpdate,
493    RemoveChannelMessage,
494    UpdateChannelMessage,
495    UpdateChannelBuffer,
496    UpdateChannelBufferCollaborators,
497);
498
499const KIB: usize = 1024;
500const MIB: usize = KIB * 1024;
501const MAX_BUFFER_LEN: usize = MIB;
502
503/// A stream of protobuf messages.
504pub struct MessageStream<S> {
505    stream: S,
506    encoding_buffer: Vec<u8>,
507}
508
509#[allow(clippy::large_enum_variant)]
510#[derive(Debug)]
511pub enum Message {
512    Envelope(Envelope),
513    Ping,
514    Pong,
515}
516
517impl<S> MessageStream<S> {
518    pub fn new(stream: S) -> Self {
519        Self {
520            stream,
521            encoding_buffer: Vec::new(),
522        }
523    }
524
525    pub fn inner_mut(&mut self) -> &mut S {
526        &mut self.stream
527    }
528}
529
530impl<S> MessageStream<S>
531where
532    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
533{
534    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
535        #[cfg(any(test, feature = "test-support"))]
536        const COMPRESSION_LEVEL: i32 = -7;
537
538        #[cfg(not(any(test, feature = "test-support")))]
539        const COMPRESSION_LEVEL: i32 = 4;
540
541        match message {
542            Message::Envelope(message) => {
543                self.encoding_buffer.reserve(message.encoded_len());
544                message
545                    .encode(&mut self.encoding_buffer)
546                    .map_err(io::Error::from)?;
547                let buffer =
548                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
549                        .unwrap();
550
551                self.encoding_buffer.clear();
552                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
553                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
554            }
555            Message::Ping => {
556                self.stream
557                    .send(WebSocketMessage::Ping(Default::default()))
558                    .await?;
559            }
560            Message::Pong => {
561                self.stream
562                    .send(WebSocketMessage::Pong(Default::default()))
563                    .await?;
564            }
565        }
566
567        Ok(())
568    }
569}
570
571impl<S> MessageStream<S>
572where
573    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
574{
575    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
576        while let Some(bytes) = self.stream.next().await {
577            let received_at = Instant::now();
578            match bytes? {
579                WebSocketMessage::Binary(bytes) => {
580                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
581                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
582                        .map_err(io::Error::from)?;
583
584                    self.encoding_buffer.clear();
585                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
586                    return Ok((Message::Envelope(envelope), received_at));
587                }
588                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
589                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
590                WebSocketMessage::Close(_) => break,
591                _ => {}
592            }
593        }
594        Err(anyhow!("connection closed"))
595    }
596}
597
598impl From<Timestamp> for SystemTime {
599    fn from(val: Timestamp) -> Self {
600        UNIX_EPOCH
601            .checked_add(Duration::new(val.seconds, val.nanos))
602            .unwrap()
603    }
604}
605
606impl From<SystemTime> for Timestamp {
607    fn from(time: SystemTime) -> Self {
608        let duration = time.duration_since(UNIX_EPOCH).unwrap();
609        Self {
610            seconds: duration.as_secs(),
611            nanos: duration.subsec_nanos(),
612        }
613    }
614}
615
616impl From<u128> for Nonce {
617    fn from(nonce: u128) -> Self {
618        let upper_half = (nonce >> 64) as u64;
619        let lower_half = nonce as u64;
620        Self {
621            upper_half,
622            lower_half,
623        }
624    }
625}
626
627impl From<Nonce> for u128 {
628    fn from(nonce: Nonce) -> Self {
629        let upper_half = (nonce.upper_half as u128) << 64;
630        let lower_half = nonce.lower_half as u128;
631        upper_half | lower_half
632    }
633}
634
635pub fn split_worktree_update(
636    mut message: UpdateWorktree,
637    max_chunk_size: usize,
638) -> impl Iterator<Item = UpdateWorktree> {
639    let mut done_files = false;
640
641    let mut repository_map = message
642        .updated_repositories
643        .into_iter()
644        .map(|repo| (repo.work_directory_id, repo))
645        .collect::<HashMap<_, _>>();
646
647    iter::from_fn(move || {
648        if done_files {
649            return None;
650        }
651
652        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
653        let updated_entries: Vec<_> = message
654            .updated_entries
655            .drain(..updated_entries_chunk_size)
656            .collect();
657
658        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
659        let removed_entries = message
660            .removed_entries
661            .drain(..removed_entries_chunk_size)
662            .collect();
663
664        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
665
666        let mut updated_repositories = Vec::new();
667
668        if !repository_map.is_empty() {
669            for entry in &updated_entries {
670                if let Some(repo) = repository_map.remove(&entry.id) {
671                    updated_repositories.push(repo)
672                }
673            }
674        }
675
676        let removed_repositories = if done_files {
677            mem::take(&mut message.removed_repositories)
678        } else {
679            Default::default()
680        };
681
682        if done_files {
683            updated_repositories.extend(mem::take(&mut repository_map).into_values());
684        }
685
686        Some(UpdateWorktree {
687            project_id: message.project_id,
688            worktree_id: message.worktree_id,
689            root_name: message.root_name.clone(),
690            abs_path: message.abs_path.clone(),
691            updated_entries,
692            removed_entries,
693            scan_id: message.scan_id,
694            is_last_update: done_files && message.is_last_update,
695            updated_repositories,
696            removed_repositories,
697        })
698    })
699}
700
701#[cfg(test)]
702mod tests {
703    use super::*;
704
705    #[gpui::test]
706    async fn test_buffer_size() {
707        let (tx, rx) = futures::channel::mpsc::unbounded();
708        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
709        sink.write(Message::Envelope(Envelope {
710            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
711                root_name: "abcdefg".repeat(10),
712                ..Default::default()
713            })),
714            ..Default::default()
715        }))
716        .await
717        .unwrap();
718        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
719        sink.write(Message::Envelope(Envelope {
720            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
721                root_name: "abcdefg".repeat(1000000),
722                ..Default::default()
723            })),
724            ..Default::default()
725        }))
726        .await
727        .unwrap();
728        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
729
730        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
731        stream.read().await.unwrap();
732        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
733        stream.read().await.unwrap();
734        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
735    }
736
737    #[gpui::test]
738    fn test_converting_peer_id_from_and_to_u64() {
739        let peer_id = PeerId {
740            owner_id: 10,
741            id: 3,
742        };
743        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
744        let peer_id = PeerId {
745            owner_id: u32::MAX,
746            id: 3,
747        };
748        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
749        let peer_id = PeerId {
750            owner_id: 10,
751            id: u32::MAX,
752        };
753        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
754        let peer_id = PeerId {
755            owner_id: u32::MAX,
756            id: u32::MAX,
757        };
758        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
759    }
760}