proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetTypeDefinition, Background),
205    (GetTypeDefinitionResponse, Background),
206    (GetImplementation, Background),
207    (GetImplementationResponse, Background),
208    (GetUsers, Foreground),
209    (Hello, Foreground),
210    (IncomingCall, Foreground),
211    (InlayHints, Background),
212    (InlayHintsResponse, Background),
213    (InviteChannelMember, Foreground),
214    (JoinChannel, Foreground),
215    (JoinChannelBuffer, Foreground),
216    (JoinChannelBufferResponse, Foreground),
217    (JoinChannelChat, Foreground),
218    (JoinChannelChatResponse, Foreground),
219    (JoinProject, Foreground),
220    (JoinHostedProject, Foreground),
221    (JoinProjectResponse, Foreground),
222    (JoinRoom, Foreground),
223    (JoinRoomResponse, Foreground),
224    (LanguageModelResponse, Background),
225    (LeaveChannelBuffer, Background),
226    (LeaveChannelChat, Foreground),
227    (LeaveProject, Foreground),
228    (LeaveRoom, Foreground),
229    (MarkNotificationRead, Foreground),
230    (MoveChannel, Foreground),
231    (OnTypeFormatting, Background),
232    (OnTypeFormattingResponse, Background),
233    (OpenBufferById, Background),
234    (OpenBufferByPath, Background),
235    (OpenBufferForSymbol, Background),
236    (OpenBufferForSymbolResponse, Background),
237    (OpenBufferResponse, Background),
238    (PerformRename, Background),
239    (PerformRenameResponse, Background),
240    (Ping, Foreground),
241    (PrepareRename, Background),
242    (PrepareRenameResponse, Background),
243    (ProjectEntryResponse, Foreground),
244    (RefreshInlayHints, Foreground),
245    (RejoinChannelBuffers, Foreground),
246    (RejoinChannelBuffersResponse, Foreground),
247    (RejoinRoom, Foreground),
248    (RejoinRoomResponse, Foreground),
249    (ReloadBuffers, Foreground),
250    (ReloadBuffersResponse, Foreground),
251    (RemoveChannelMember, Foreground),
252    (RemoveChannelMessage, Foreground),
253    (UpdateChannelMessage, Foreground),
254    (RemoveContact, Foreground),
255    (RemoveProjectCollaborator, Foreground),
256    (RenameChannel, Foreground),
257    (RenameChannelResponse, Foreground),
258    (RenameProjectEntry, Foreground),
259    (RequestContact, Foreground),
260    (ResolveCompletionDocumentation, Background),
261    (ResolveCompletionDocumentationResponse, Background),
262    (ResolveInlayHint, Background),
263    (ResolveInlayHintResponse, Background),
264    (RespondToChannelInvite, Foreground),
265    (RespondToContactRequest, Foreground),
266    (RoomUpdated, Foreground),
267    (SaveBuffer, Foreground),
268    (SetChannelMemberRole, Foreground),
269    (SetChannelVisibility, Foreground),
270    (SearchProject, Background),
271    (SearchProjectResponse, Background),
272    (SendChannelMessage, Background),
273    (SendChannelMessageResponse, Background),
274    (ShareProject, Foreground),
275    (ShareProjectResponse, Foreground),
276    (ShowContacts, Foreground),
277    (StartLanguageServer, Foreground),
278    (SynchronizeBuffers, Foreground),
279    (SynchronizeBuffersResponse, Foreground),
280    (Test, Foreground),
281    (Unfollow, Foreground),
282    (UnshareProject, Foreground),
283    (UpdateBuffer, Foreground),
284    (UpdateBufferFile, Foreground),
285    (UpdateChannelBuffer, Foreground),
286    (UpdateChannelBufferCollaborators, Foreground),
287    (UpdateChannels, Foreground),
288    (UpdateUserChannels, Foreground),
289    (UpdateContacts, Foreground),
290    (UpdateDiagnosticSummary, Foreground),
291    (UpdateDiffBase, Foreground),
292    (UpdateFollowers, Foreground),
293    (UpdateInviteInfo, Foreground),
294    (UpdateLanguageServer, Foreground),
295    (UpdateParticipantLocation, Foreground),
296    (UpdateProject, Foreground),
297    (UpdateProjectCollaborator, Foreground),
298    (UpdateWorktree, Foreground),
299    (UpdateWorktreeSettings, Foreground),
300    (UsersResponse, Foreground),
301    (LspExtExpandMacro, Background),
302    (LspExtExpandMacroResponse, Background),
303    (SetRoomParticipantRole, Foreground),
304    (BlameBuffer, Foreground),
305    (BlameBufferResponse, Foreground),
306    (CreateRemoteProject, Background),
307    (CreateRemoteProjectResponse, Foreground),
308    (CreateDevServer, Foreground),
309    (CreateDevServerResponse, Foreground),
310    (DevServerInstructions, Foreground),
311    (ShutdownDevServer, Foreground),
312    (ReconnectDevServer, Foreground),
313    (ReconnectDevServerResponse, Foreground),
314    (ShareRemoteProject, Foreground),
315    (JoinRemoteProject, Foreground),
316    (RejoinRemoteProjects, Foreground),
317    (RejoinRemoteProjectsResponse, Foreground),
318    (MultiLspQuery, Background),
319    (MultiLspQueryResponse, Background),
320    (RemoteProjectsUpdate, Foreground),
321    (ValidateRemoteProjectRequest, Background),
322    (DeleteDevServer, Foreground),
323    (OpenNewBuffer, Foreground)
324);
325
326request_messages!(
327    (ApplyCodeAction, ApplyCodeActionResponse),
328    (
329        ApplyCompletionAdditionalEdits,
330        ApplyCompletionAdditionalEditsResponse
331    ),
332    (Call, Ack),
333    (CancelCall, Ack),
334    (CopyProjectEntry, ProjectEntryResponse),
335    (CompleteWithLanguageModel, LanguageModelResponse),
336    (ComputeEmbeddings, ComputeEmbeddingsResponse),
337    (CountTokensWithLanguageModel, CountTokensResponse),
338    (CreateChannel, CreateChannelResponse),
339    (CreateProjectEntry, ProjectEntryResponse),
340    (CreateRoom, CreateRoomResponse),
341    (DeclineCall, Ack),
342    (DeleteChannel, Ack),
343    (DeleteProjectEntry, ProjectEntryResponse),
344    (ExpandProjectEntry, ExpandProjectEntryResponse),
345    (Follow, FollowResponse),
346    (FormatBuffers, FormatBuffersResponse),
347    (FuzzySearchUsers, UsersResponse),
348    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
349    (GetChannelMembers, GetChannelMembersResponse),
350    (GetChannelMessages, GetChannelMessagesResponse),
351    (GetChannelMessagesById, GetChannelMessagesResponse),
352    (GetCodeActions, GetCodeActionsResponse),
353    (GetCompletions, GetCompletionsResponse),
354    (GetDefinition, GetDefinitionResponse),
355    (GetImplementation, GetImplementationResponse),
356    (GetDocumentHighlights, GetDocumentHighlightsResponse),
357    (GetHover, GetHoverResponse),
358    (GetNotifications, GetNotificationsResponse),
359    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
360    (GetProjectSymbols, GetProjectSymbolsResponse),
361    (GetReferences, GetReferencesResponse),
362    (GetTypeDefinition, GetTypeDefinitionResponse),
363    (GetUsers, UsersResponse),
364    (IncomingCall, Ack),
365    (InlayHints, InlayHintsResponse),
366    (InviteChannelMember, Ack),
367    (JoinChannel, JoinRoomResponse),
368    (JoinChannelBuffer, JoinChannelBufferResponse),
369    (JoinChannelChat, JoinChannelChatResponse),
370    (JoinHostedProject, JoinProjectResponse),
371    (JoinProject, JoinProjectResponse),
372    (JoinRoom, JoinRoomResponse),
373    (LeaveChannelBuffer, Ack),
374    (LeaveRoom, Ack),
375    (MarkNotificationRead, Ack),
376    (MoveChannel, Ack),
377    (OnTypeFormatting, OnTypeFormattingResponse),
378    (OpenBufferById, OpenBufferResponse),
379    (OpenBufferByPath, OpenBufferResponse),
380    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
381    (OpenNewBuffer, OpenBufferResponse),
382    (PerformRename, PerformRenameResponse),
383    (Ping, Ack),
384    (PrepareRename, PrepareRenameResponse),
385    (RefreshInlayHints, Ack),
386    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
387    (RejoinRoom, RejoinRoomResponse),
388    (ReloadBuffers, ReloadBuffersResponse),
389    (RemoveChannelMember, Ack),
390    (RemoveChannelMessage, Ack),
391    (UpdateChannelMessage, Ack),
392    (RemoveContact, Ack),
393    (RenameChannel, RenameChannelResponse),
394    (RenameProjectEntry, ProjectEntryResponse),
395    (RequestContact, Ack),
396    (
397        ResolveCompletionDocumentation,
398        ResolveCompletionDocumentationResponse
399    ),
400    (ResolveInlayHint, ResolveInlayHintResponse),
401    (RespondToChannelInvite, Ack),
402    (RespondToContactRequest, Ack),
403    (SaveBuffer, BufferSaved),
404    (SearchProject, SearchProjectResponse),
405    (SendChannelMessage, SendChannelMessageResponse),
406    (SetChannelMemberRole, Ack),
407    (SetChannelVisibility, Ack),
408    (ShareProject, ShareProjectResponse),
409    (SynchronizeBuffers, SynchronizeBuffersResponse),
410    (Test, Test),
411    (UpdateBuffer, Ack),
412    (UpdateParticipantLocation, Ack),
413    (UpdateProject, Ack),
414    (UpdateWorktree, Ack),
415    (LspExtExpandMacro, LspExtExpandMacroResponse),
416    (SetRoomParticipantRole, Ack),
417    (BlameBuffer, BlameBufferResponse),
418    (CreateRemoteProject, CreateRemoteProjectResponse),
419    (CreateDevServer, CreateDevServerResponse),
420    (ShutdownDevServer, Ack),
421    (ShareRemoteProject, ShareProjectResponse),
422    (JoinRemoteProject, JoinProjectResponse),
423    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
424    (ReconnectDevServer, ReconnectDevServerResponse),
425    (ValidateRemoteProjectRequest, Ack),
426    (MultiLspQuery, MultiLspQueryResponse),
427    (DeleteDevServer, Ack),
428);
429
430entity_messages!(
431    {project_id, ShareProject},
432    AddProjectCollaborator,
433    ApplyCodeAction,
434    ApplyCompletionAdditionalEdits,
435    BlameBuffer,
436    BufferReloaded,
437    BufferSaved,
438    CopyProjectEntry,
439    CreateBufferForPeer,
440    CreateProjectEntry,
441    DeleteProjectEntry,
442    ExpandProjectEntry,
443    FormatBuffers,
444    GetCodeActions,
445    GetCompletions,
446    GetDefinition,
447    GetImplementation,
448    GetDocumentHighlights,
449    GetHover,
450    GetProjectSymbols,
451    GetReferences,
452    GetTypeDefinition,
453    InlayHints,
454    JoinProject,
455    LeaveProject,
456    MultiLspQuery,
457    OnTypeFormatting,
458    OpenNewBuffer,
459    OpenBufferById,
460    OpenBufferByPath,
461    OpenBufferForSymbol,
462    PerformRename,
463    PrepareRename,
464    RefreshInlayHints,
465    ReloadBuffers,
466    RemoveProjectCollaborator,
467    RenameProjectEntry,
468    ResolveCompletionDocumentation,
469    ResolveInlayHint,
470    SaveBuffer,
471    SearchProject,
472    StartLanguageServer,
473    SynchronizeBuffers,
474    UnshareProject,
475    UpdateBuffer,
476    UpdateBufferFile,
477    UpdateDiagnosticSummary,
478    UpdateDiffBase,
479    UpdateLanguageServer,
480    UpdateProject,
481    UpdateProjectCollaborator,
482    UpdateWorktree,
483    UpdateWorktreeSettings,
484    LspExtExpandMacro,
485);
486
487entity_messages!(
488    {channel_id, Channel},
489    ChannelMessageSent,
490    ChannelMessageUpdate,
491    RemoveChannelMessage,
492    UpdateChannelMessage,
493    UpdateChannelBuffer,
494    UpdateChannelBufferCollaborators,
495);
496
497const KIB: usize = 1024;
498const MIB: usize = KIB * 1024;
499const MAX_BUFFER_LEN: usize = MIB;
500
501/// A stream of protobuf messages.
502pub struct MessageStream<S> {
503    stream: S,
504    encoding_buffer: Vec<u8>,
505}
506
507#[allow(clippy::large_enum_variant)]
508#[derive(Debug)]
509pub enum Message {
510    Envelope(Envelope),
511    Ping,
512    Pong,
513}
514
515impl<S> MessageStream<S> {
516    pub fn new(stream: S) -> Self {
517        Self {
518            stream,
519            encoding_buffer: Vec::new(),
520        }
521    }
522
523    pub fn inner_mut(&mut self) -> &mut S {
524        &mut self.stream
525    }
526}
527
528impl<S> MessageStream<S>
529where
530    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
531{
532    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
533        #[cfg(any(test, feature = "test-support"))]
534        const COMPRESSION_LEVEL: i32 = -7;
535
536        #[cfg(not(any(test, feature = "test-support")))]
537        const COMPRESSION_LEVEL: i32 = 4;
538
539        match message {
540            Message::Envelope(message) => {
541                self.encoding_buffer.reserve(message.encoded_len());
542                message
543                    .encode(&mut self.encoding_buffer)
544                    .map_err(io::Error::from)?;
545                let buffer =
546                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
547                        .unwrap();
548
549                self.encoding_buffer.clear();
550                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
551                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
552            }
553            Message::Ping => {
554                self.stream
555                    .send(WebSocketMessage::Ping(Default::default()))
556                    .await?;
557            }
558            Message::Pong => {
559                self.stream
560                    .send(WebSocketMessage::Pong(Default::default()))
561                    .await?;
562            }
563        }
564
565        Ok(())
566    }
567}
568
569impl<S> MessageStream<S>
570where
571    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
572{
573    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
574        while let Some(bytes) = self.stream.next().await {
575            let received_at = Instant::now();
576            match bytes? {
577                WebSocketMessage::Binary(bytes) => {
578                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
579                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
580                        .map_err(io::Error::from)?;
581
582                    self.encoding_buffer.clear();
583                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
584                    return Ok((Message::Envelope(envelope), received_at));
585                }
586                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
587                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
588                WebSocketMessage::Close(_) => break,
589                _ => {}
590            }
591        }
592        Err(anyhow!("connection closed"))
593    }
594}
595
596impl From<Timestamp> for SystemTime {
597    fn from(val: Timestamp) -> Self {
598        UNIX_EPOCH
599            .checked_add(Duration::new(val.seconds, val.nanos))
600            .unwrap()
601    }
602}
603
604impl From<SystemTime> for Timestamp {
605    fn from(time: SystemTime) -> Self {
606        let duration = time.duration_since(UNIX_EPOCH).unwrap();
607        Self {
608            seconds: duration.as_secs(),
609            nanos: duration.subsec_nanos(),
610        }
611    }
612}
613
614impl From<u128> for Nonce {
615    fn from(nonce: u128) -> Self {
616        let upper_half = (nonce >> 64) as u64;
617        let lower_half = nonce as u64;
618        Self {
619            upper_half,
620            lower_half,
621        }
622    }
623}
624
625impl From<Nonce> for u128 {
626    fn from(nonce: Nonce) -> Self {
627        let upper_half = (nonce.upper_half as u128) << 64;
628        let lower_half = nonce.lower_half as u128;
629        upper_half | lower_half
630    }
631}
632
633pub fn split_worktree_update(
634    mut message: UpdateWorktree,
635    max_chunk_size: usize,
636) -> impl Iterator<Item = UpdateWorktree> {
637    let mut done_files = false;
638
639    let mut repository_map = message
640        .updated_repositories
641        .into_iter()
642        .map(|repo| (repo.work_directory_id, repo))
643        .collect::<HashMap<_, _>>();
644
645    iter::from_fn(move || {
646        if done_files {
647            return None;
648        }
649
650        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
651        let updated_entries: Vec<_> = message
652            .updated_entries
653            .drain(..updated_entries_chunk_size)
654            .collect();
655
656        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
657        let removed_entries = message
658            .removed_entries
659            .drain(..removed_entries_chunk_size)
660            .collect();
661
662        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
663
664        let mut updated_repositories = Vec::new();
665
666        if !repository_map.is_empty() {
667            for entry in &updated_entries {
668                if let Some(repo) = repository_map.remove(&entry.id) {
669                    updated_repositories.push(repo)
670                }
671            }
672        }
673
674        let removed_repositories = if done_files {
675            mem::take(&mut message.removed_repositories)
676        } else {
677            Default::default()
678        };
679
680        if done_files {
681            updated_repositories.extend(mem::take(&mut repository_map).into_values());
682        }
683
684        Some(UpdateWorktree {
685            project_id: message.project_id,
686            worktree_id: message.worktree_id,
687            root_name: message.root_name.clone(),
688            abs_path: message.abs_path.clone(),
689            updated_entries,
690            removed_entries,
691            scan_id: message.scan_id,
692            is_last_update: done_files && message.is_last_update,
693            updated_repositories,
694            removed_repositories,
695        })
696    })
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702
703    #[gpui::test]
704    async fn test_buffer_size() {
705        let (tx, rx) = futures::channel::mpsc::unbounded();
706        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
707        sink.write(Message::Envelope(Envelope {
708            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
709                root_name: "abcdefg".repeat(10),
710                ..Default::default()
711            })),
712            ..Default::default()
713        }))
714        .await
715        .unwrap();
716        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
717        sink.write(Message::Envelope(Envelope {
718            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
719                root_name: "abcdefg".repeat(1000000),
720                ..Default::default()
721            })),
722            ..Default::default()
723        }))
724        .await
725        .unwrap();
726        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
727
728        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
729        stream.read().await.unwrap();
730        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
731        stream.read().await.unwrap();
732        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
733    }
734
735    #[gpui::test]
736    fn test_converting_peer_id_from_and_to_u64() {
737        let peer_id = PeerId {
738            owner_id: 10,
739            id: 3,
740        };
741        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
742        let peer_id = PeerId {
743            owner_id: u32::MAX,
744            id: 3,
745        };
746        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
747        let peer_id = PeerId {
748            owner_id: 10,
749            id: u32::MAX,
750        };
751        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
752        let peer_id = PeerId {
753            owner_id: u32::MAX,
754            id: u32::MAX,
755        };
756        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
757    }
758}