proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetTypeDefinition, Background),
205    (GetTypeDefinitionResponse, Background),
206    (GetImplementation, Background),
207    (GetImplementationResponse, Background),
208    (GetUsers, Foreground),
209    (Hello, Foreground),
210    (IncomingCall, Foreground),
211    (InlayHints, Background),
212    (InlayHintsResponse, Background),
213    (InviteChannelMember, Foreground),
214    (JoinChannel, Foreground),
215    (JoinChannelBuffer, Foreground),
216    (JoinChannelBufferResponse, Foreground),
217    (JoinChannelChat, Foreground),
218    (JoinChannelChatResponse, Foreground),
219    (JoinProject, Foreground),
220    (JoinHostedProject, Foreground),
221    (JoinProjectResponse, Foreground),
222    (JoinRoom, Foreground),
223    (JoinRoomResponse, Foreground),
224    (LanguageModelResponse, Background),
225    (LeaveChannelBuffer, Background),
226    (LeaveChannelChat, Foreground),
227    (LeaveProject, Foreground),
228    (LeaveRoom, Foreground),
229    (MarkNotificationRead, Foreground),
230    (MoveChannel, Foreground),
231    (OnTypeFormatting, Background),
232    (OnTypeFormattingResponse, Background),
233    (OpenBufferById, Background),
234    (OpenBufferByPath, Background),
235    (OpenBufferForSymbol, Background),
236    (OpenBufferForSymbolResponse, Background),
237    (OpenBufferResponse, Background),
238    (PerformRename, Background),
239    (PerformRenameResponse, Background),
240    (Ping, Foreground),
241    (PrepareRename, Background),
242    (PrepareRenameResponse, Background),
243    (ProjectEntryResponse, Foreground),
244    (RefreshInlayHints, Foreground),
245    (RejoinChannelBuffers, Foreground),
246    (RejoinChannelBuffersResponse, Foreground),
247    (RejoinRoom, Foreground),
248    (RejoinRoomResponse, Foreground),
249    (ReloadBuffers, Foreground),
250    (ReloadBuffersResponse, Foreground),
251    (RemoveChannelMember, Foreground),
252    (RemoveChannelMessage, Foreground),
253    (UpdateChannelMessage, Foreground),
254    (RemoveContact, Foreground),
255    (RemoveProjectCollaborator, Foreground),
256    (RenameChannel, Foreground),
257    (RenameChannelResponse, Foreground),
258    (RenameProjectEntry, Foreground),
259    (RequestContact, Foreground),
260    (ResolveCompletionDocumentation, Background),
261    (ResolveCompletionDocumentationResponse, Background),
262    (ResolveInlayHint, Background),
263    (ResolveInlayHintResponse, Background),
264    (RespondToChannelInvite, Foreground),
265    (RespondToContactRequest, Foreground),
266    (RoomUpdated, Foreground),
267    (SaveBuffer, Foreground),
268    (SetChannelMemberRole, Foreground),
269    (SetChannelVisibility, Foreground),
270    (SearchProject, Background),
271    (SearchProjectResponse, Background),
272    (SendChannelMessage, Background),
273    (SendChannelMessageResponse, Background),
274    (ShareProject, Foreground),
275    (ShareProjectResponse, Foreground),
276    (ShowContacts, Foreground),
277    (StartLanguageServer, Foreground),
278    (SynchronizeBuffers, Foreground),
279    (SynchronizeBuffersResponse, Foreground),
280    (Test, Foreground),
281    (Unfollow, Foreground),
282    (UnshareProject, Foreground),
283    (UpdateBuffer, Foreground),
284    (UpdateBufferFile, Foreground),
285    (UpdateChannelBuffer, Foreground),
286    (UpdateChannelBufferCollaborators, Foreground),
287    (UpdateChannels, Foreground),
288    (UpdateUserChannels, Foreground),
289    (UpdateContacts, Foreground),
290    (UpdateDiagnosticSummary, Foreground),
291    (UpdateDiffBase, Foreground),
292    (UpdateFollowers, Foreground),
293    (UpdateInviteInfo, Foreground),
294    (UpdateLanguageServer, Foreground),
295    (UpdateParticipantLocation, Foreground),
296    (UpdateProject, Foreground),
297    (UpdateProjectCollaborator, Foreground),
298    (UpdateWorktree, Foreground),
299    (UpdateWorktreeSettings, Foreground),
300    (UsersResponse, Foreground),
301    (LspExtExpandMacro, Background),
302    (LspExtExpandMacroResponse, Background),
303    (SetRoomParticipantRole, Foreground),
304    (BlameBuffer, Foreground),
305    (BlameBufferResponse, Foreground),
306    (CreateRemoteProject, Background),
307    (CreateRemoteProjectResponse, Foreground),
308    (CreateDevServer, Foreground),
309    (CreateDevServerResponse, Foreground),
310    (DevServerInstructions, Foreground),
311    (ShutdownDevServer, Foreground),
312    (ReconnectDevServer, Foreground),
313    (ReconnectDevServerResponse, Foreground),
314    (ShareRemoteProject, Foreground),
315    (JoinRemoteProject, Foreground),
316    (RejoinRemoteProjects, Foreground),
317    (RejoinRemoteProjectsResponse, Foreground),
318    (MultiLspQuery, Background),
319    (MultiLspQueryResponse, Background),
320    (RemoteProjectsUpdate, Foreground),
321    (ValidateRemoteProjectRequest, Background),
322    (DeleteDevServer, Foreground)
323);
324
325request_messages!(
326    (ApplyCodeAction, ApplyCodeActionResponse),
327    (
328        ApplyCompletionAdditionalEdits,
329        ApplyCompletionAdditionalEditsResponse
330    ),
331    (Call, Ack),
332    (CancelCall, Ack),
333    (CopyProjectEntry, ProjectEntryResponse),
334    (CompleteWithLanguageModel, LanguageModelResponse),
335    (ComputeEmbeddings, ComputeEmbeddingsResponse),
336    (CountTokensWithLanguageModel, CountTokensResponse),
337    (CreateChannel, CreateChannelResponse),
338    (CreateProjectEntry, ProjectEntryResponse),
339    (CreateRoom, CreateRoomResponse),
340    (DeclineCall, Ack),
341    (DeleteChannel, Ack),
342    (DeleteProjectEntry, ProjectEntryResponse),
343    (ExpandProjectEntry, ExpandProjectEntryResponse),
344    (Follow, FollowResponse),
345    (FormatBuffers, FormatBuffersResponse),
346    (FuzzySearchUsers, UsersResponse),
347    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
348    (GetChannelMembers, GetChannelMembersResponse),
349    (GetChannelMessages, GetChannelMessagesResponse),
350    (GetChannelMessagesById, GetChannelMessagesResponse),
351    (GetCodeActions, GetCodeActionsResponse),
352    (GetCompletions, GetCompletionsResponse),
353    (GetDefinition, GetDefinitionResponse),
354    (GetImplementation, GetImplementationResponse),
355    (GetDocumentHighlights, GetDocumentHighlightsResponse),
356    (GetHover, GetHoverResponse),
357    (GetNotifications, GetNotificationsResponse),
358    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
359    (GetProjectSymbols, GetProjectSymbolsResponse),
360    (GetReferences, GetReferencesResponse),
361    (GetTypeDefinition, GetTypeDefinitionResponse),
362    (GetUsers, UsersResponse),
363    (IncomingCall, Ack),
364    (InlayHints, InlayHintsResponse),
365    (InviteChannelMember, Ack),
366    (JoinChannel, JoinRoomResponse),
367    (JoinChannelBuffer, JoinChannelBufferResponse),
368    (JoinChannelChat, JoinChannelChatResponse),
369    (JoinHostedProject, JoinProjectResponse),
370    (JoinProject, JoinProjectResponse),
371    (JoinRoom, JoinRoomResponse),
372    (LeaveChannelBuffer, Ack),
373    (LeaveRoom, Ack),
374    (MarkNotificationRead, Ack),
375    (MoveChannel, Ack),
376    (OnTypeFormatting, OnTypeFormattingResponse),
377    (OpenBufferById, OpenBufferResponse),
378    (OpenBufferByPath, OpenBufferResponse),
379    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
380    (PerformRename, PerformRenameResponse),
381    (Ping, Ack),
382    (PrepareRename, PrepareRenameResponse),
383    (RefreshInlayHints, Ack),
384    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
385    (RejoinRoom, RejoinRoomResponse),
386    (ReloadBuffers, ReloadBuffersResponse),
387    (RemoveChannelMember, Ack),
388    (RemoveChannelMessage, Ack),
389    (UpdateChannelMessage, Ack),
390    (RemoveContact, Ack),
391    (RenameChannel, RenameChannelResponse),
392    (RenameProjectEntry, ProjectEntryResponse),
393    (RequestContact, Ack),
394    (
395        ResolveCompletionDocumentation,
396        ResolveCompletionDocumentationResponse
397    ),
398    (ResolveInlayHint, ResolveInlayHintResponse),
399    (RespondToChannelInvite, Ack),
400    (RespondToContactRequest, Ack),
401    (SaveBuffer, BufferSaved),
402    (SearchProject, SearchProjectResponse),
403    (SendChannelMessage, SendChannelMessageResponse),
404    (SetChannelMemberRole, Ack),
405    (SetChannelVisibility, Ack),
406    (ShareProject, ShareProjectResponse),
407    (SynchronizeBuffers, SynchronizeBuffersResponse),
408    (Test, Test),
409    (UpdateBuffer, Ack),
410    (UpdateParticipantLocation, Ack),
411    (UpdateProject, Ack),
412    (UpdateWorktree, Ack),
413    (LspExtExpandMacro, LspExtExpandMacroResponse),
414    (SetRoomParticipantRole, Ack),
415    (BlameBuffer, BlameBufferResponse),
416    (CreateRemoteProject, CreateRemoteProjectResponse),
417    (CreateDevServer, CreateDevServerResponse),
418    (ShutdownDevServer, Ack),
419    (ShareRemoteProject, ShareProjectResponse),
420    (JoinRemoteProject, JoinProjectResponse),
421    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
422    (ReconnectDevServer, ReconnectDevServerResponse),
423    (ValidateRemoteProjectRequest, Ack),
424    (MultiLspQuery, MultiLspQueryResponse),
425    (DeleteDevServer, Ack),
426);
427
428entity_messages!(
429    {project_id, ShareProject},
430    AddProjectCollaborator,
431    ApplyCodeAction,
432    ApplyCompletionAdditionalEdits,
433    BlameBuffer,
434    BufferReloaded,
435    BufferSaved,
436    CopyProjectEntry,
437    CreateBufferForPeer,
438    CreateProjectEntry,
439    DeleteProjectEntry,
440    ExpandProjectEntry,
441    FormatBuffers,
442    GetCodeActions,
443    GetCompletions,
444    GetDefinition,
445    GetImplementation,
446    GetDocumentHighlights,
447    GetHover,
448    GetProjectSymbols,
449    GetReferences,
450    GetTypeDefinition,
451    InlayHints,
452    JoinProject,
453    LeaveProject,
454    MultiLspQuery,
455    OnTypeFormatting,
456    OpenBufferById,
457    OpenBufferByPath,
458    OpenBufferForSymbol,
459    PerformRename,
460    PrepareRename,
461    RefreshInlayHints,
462    ReloadBuffers,
463    RemoveProjectCollaborator,
464    RenameProjectEntry,
465    ResolveCompletionDocumentation,
466    ResolveInlayHint,
467    SaveBuffer,
468    SearchProject,
469    StartLanguageServer,
470    SynchronizeBuffers,
471    UnshareProject,
472    UpdateBuffer,
473    UpdateBufferFile,
474    UpdateDiagnosticSummary,
475    UpdateDiffBase,
476    UpdateLanguageServer,
477    UpdateProject,
478    UpdateProjectCollaborator,
479    UpdateWorktree,
480    UpdateWorktreeSettings,
481    LspExtExpandMacro,
482);
483
484entity_messages!(
485    {channel_id, Channel},
486    ChannelMessageSent,
487    ChannelMessageUpdate,
488    RemoveChannelMessage,
489    UpdateChannelMessage,
490    UpdateChannelBuffer,
491    UpdateChannelBufferCollaborators,
492);
493
494const KIB: usize = 1024;
495const MIB: usize = KIB * 1024;
496const MAX_BUFFER_LEN: usize = MIB;
497
498/// A stream of protobuf messages.
499pub struct MessageStream<S> {
500    stream: S,
501    encoding_buffer: Vec<u8>,
502}
503
504#[allow(clippy::large_enum_variant)]
505#[derive(Debug)]
506pub enum Message {
507    Envelope(Envelope),
508    Ping,
509    Pong,
510}
511
512impl<S> MessageStream<S> {
513    pub fn new(stream: S) -> Self {
514        Self {
515            stream,
516            encoding_buffer: Vec::new(),
517        }
518    }
519
520    pub fn inner_mut(&mut self) -> &mut S {
521        &mut self.stream
522    }
523}
524
525impl<S> MessageStream<S>
526where
527    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
528{
529    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
530        #[cfg(any(test, feature = "test-support"))]
531        const COMPRESSION_LEVEL: i32 = -7;
532
533        #[cfg(not(any(test, feature = "test-support")))]
534        const COMPRESSION_LEVEL: i32 = 4;
535
536        match message {
537            Message::Envelope(message) => {
538                self.encoding_buffer.reserve(message.encoded_len());
539                message
540                    .encode(&mut self.encoding_buffer)
541                    .map_err(io::Error::from)?;
542                let buffer =
543                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
544                        .unwrap();
545
546                self.encoding_buffer.clear();
547                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
548                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
549            }
550            Message::Ping => {
551                self.stream
552                    .send(WebSocketMessage::Ping(Default::default()))
553                    .await?;
554            }
555            Message::Pong => {
556                self.stream
557                    .send(WebSocketMessage::Pong(Default::default()))
558                    .await?;
559            }
560        }
561
562        Ok(())
563    }
564}
565
566impl<S> MessageStream<S>
567where
568    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
569{
570    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
571        while let Some(bytes) = self.stream.next().await {
572            let received_at = Instant::now();
573            match bytes? {
574                WebSocketMessage::Binary(bytes) => {
575                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
576                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
577                        .map_err(io::Error::from)?;
578
579                    self.encoding_buffer.clear();
580                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
581                    return Ok((Message::Envelope(envelope), received_at));
582                }
583                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
584                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
585                WebSocketMessage::Close(_) => break,
586                _ => {}
587            }
588        }
589        Err(anyhow!("connection closed"))
590    }
591}
592
593impl From<Timestamp> for SystemTime {
594    fn from(val: Timestamp) -> Self {
595        UNIX_EPOCH
596            .checked_add(Duration::new(val.seconds, val.nanos))
597            .unwrap()
598    }
599}
600
601impl From<SystemTime> for Timestamp {
602    fn from(time: SystemTime) -> Self {
603        let duration = time.duration_since(UNIX_EPOCH).unwrap();
604        Self {
605            seconds: duration.as_secs(),
606            nanos: duration.subsec_nanos(),
607        }
608    }
609}
610
611impl From<u128> for Nonce {
612    fn from(nonce: u128) -> Self {
613        let upper_half = (nonce >> 64) as u64;
614        let lower_half = nonce as u64;
615        Self {
616            upper_half,
617            lower_half,
618        }
619    }
620}
621
622impl From<Nonce> for u128 {
623    fn from(nonce: Nonce) -> Self {
624        let upper_half = (nonce.upper_half as u128) << 64;
625        let lower_half = nonce.lower_half as u128;
626        upper_half | lower_half
627    }
628}
629
630pub fn split_worktree_update(
631    mut message: UpdateWorktree,
632    max_chunk_size: usize,
633) -> impl Iterator<Item = UpdateWorktree> {
634    let mut done_files = false;
635
636    let mut repository_map = message
637        .updated_repositories
638        .into_iter()
639        .map(|repo| (repo.work_directory_id, repo))
640        .collect::<HashMap<_, _>>();
641
642    iter::from_fn(move || {
643        if done_files {
644            return None;
645        }
646
647        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
648        let updated_entries: Vec<_> = message
649            .updated_entries
650            .drain(..updated_entries_chunk_size)
651            .collect();
652
653        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
654        let removed_entries = message
655            .removed_entries
656            .drain(..removed_entries_chunk_size)
657            .collect();
658
659        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
660
661        let mut updated_repositories = Vec::new();
662
663        if !repository_map.is_empty() {
664            for entry in &updated_entries {
665                if let Some(repo) = repository_map.remove(&entry.id) {
666                    updated_repositories.push(repo)
667                }
668            }
669        }
670
671        let removed_repositories = if done_files {
672            mem::take(&mut message.removed_repositories)
673        } else {
674            Default::default()
675        };
676
677        if done_files {
678            updated_repositories.extend(mem::take(&mut repository_map).into_values());
679        }
680
681        Some(UpdateWorktree {
682            project_id: message.project_id,
683            worktree_id: message.worktree_id,
684            root_name: message.root_name.clone(),
685            abs_path: message.abs_path.clone(),
686            updated_entries,
687            removed_entries,
688            scan_id: message.scan_id,
689            is_last_update: done_files && message.is_last_update,
690            updated_repositories,
691            removed_repositories,
692        })
693    })
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699
700    #[gpui::test]
701    async fn test_buffer_size() {
702        let (tx, rx) = futures::channel::mpsc::unbounded();
703        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
704        sink.write(Message::Envelope(Envelope {
705            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
706                root_name: "abcdefg".repeat(10),
707                ..Default::default()
708            })),
709            ..Default::default()
710        }))
711        .await
712        .unwrap();
713        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
714        sink.write(Message::Envelope(Envelope {
715            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
716                root_name: "abcdefg".repeat(1000000),
717                ..Default::default()
718            })),
719            ..Default::default()
720        }))
721        .await
722        .unwrap();
723        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
724
725        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
726        stream.read().await.unwrap();
727        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
728        stream.read().await.unwrap();
729        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
730    }
731
732    #[gpui::test]
733    fn test_converting_peer_id_from_and_to_u64() {
734        let peer_id = PeerId {
735            owner_id: 10,
736            id: 3,
737        };
738        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
739        let peer_id = PeerId {
740            owner_id: u32::MAX,
741            id: 3,
742        };
743        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
744        let peer_id = PeerId {
745            owner_id: 10,
746            id: u32::MAX,
747        };
748        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
749        let peer_id = PeerId {
750            owner_id: u32::MAX,
751            id: u32::MAX,
752        };
753        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
754    }
755}