proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetSupermavenApiKey, Background),
205    (GetSupermavenApiKeyResponse, Background),
206    (GetTypeDefinition, Background),
207    (GetTypeDefinitionResponse, Background),
208    (GetImplementation, Background),
209    (GetImplementationResponse, Background),
210    (GetUsers, Foreground),
211    (Hello, Foreground),
212    (IncomingCall, Foreground),
213    (InlayHints, Background),
214    (InlayHintsResponse, Background),
215    (InviteChannelMember, Foreground),
216    (JoinChannel, Foreground),
217    (JoinChannelBuffer, Foreground),
218    (JoinChannelBufferResponse, Foreground),
219    (JoinChannelChat, Foreground),
220    (JoinChannelChatResponse, Foreground),
221    (JoinProject, Foreground),
222    (JoinHostedProject, Foreground),
223    (JoinProjectResponse, Foreground),
224    (JoinRoom, Foreground),
225    (JoinRoomResponse, Foreground),
226    (LanguageModelResponse, Background),
227    (LeaveChannelBuffer, Background),
228    (LeaveChannelChat, Foreground),
229    (LeaveProject, Foreground),
230    (LeaveRoom, Foreground),
231    (MarkNotificationRead, Foreground),
232    (MoveChannel, Foreground),
233    (OnTypeFormatting, Background),
234    (OnTypeFormattingResponse, Background),
235    (OpenBufferById, Background),
236    (OpenBufferByPath, Background),
237    (OpenBufferForSymbol, Background),
238    (OpenBufferForSymbolResponse, Background),
239    (OpenBufferResponse, Background),
240    (PerformRename, Background),
241    (PerformRenameResponse, Background),
242    (Ping, Foreground),
243    (PrepareRename, Background),
244    (PrepareRenameResponse, Background),
245    (ProjectEntryResponse, Foreground),
246    (RefreshInlayHints, Foreground),
247    (RejoinChannelBuffers, Foreground),
248    (RejoinChannelBuffersResponse, Foreground),
249    (RejoinRoom, Foreground),
250    (RejoinRoomResponse, Foreground),
251    (ReloadBuffers, Foreground),
252    (ReloadBuffersResponse, Foreground),
253    (RemoveChannelMember, Foreground),
254    (RemoveChannelMessage, Foreground),
255    (UpdateChannelMessage, Foreground),
256    (RemoveContact, Foreground),
257    (RemoveProjectCollaborator, Foreground),
258    (RenameChannel, Foreground),
259    (RenameChannelResponse, Foreground),
260    (RenameProjectEntry, Foreground),
261    (RequestContact, Foreground),
262    (ResolveCompletionDocumentation, Background),
263    (ResolveCompletionDocumentationResponse, Background),
264    (ResolveInlayHint, Background),
265    (ResolveInlayHintResponse, Background),
266    (RespondToChannelInvite, Foreground),
267    (RespondToContactRequest, Foreground),
268    (RoomUpdated, Foreground),
269    (SaveBuffer, Foreground),
270    (SetChannelMemberRole, Foreground),
271    (SetChannelVisibility, Foreground),
272    (SearchProject, Background),
273    (SearchProjectResponse, Background),
274    (SendChannelMessage, Background),
275    (SendChannelMessageResponse, Background),
276    (ShareProject, Foreground),
277    (ShareProjectResponse, Foreground),
278    (ShowContacts, Foreground),
279    (StartLanguageServer, Foreground),
280    (SynchronizeBuffers, Foreground),
281    (SynchronizeBuffersResponse, Foreground),
282    (Test, Foreground),
283    (Unfollow, Foreground),
284    (UnshareProject, Foreground),
285    (UpdateBuffer, Foreground),
286    (UpdateBufferFile, Foreground),
287    (UpdateChannelBuffer, Foreground),
288    (UpdateChannelBufferCollaborators, Foreground),
289    (UpdateChannels, Foreground),
290    (UpdateUserChannels, Foreground),
291    (UpdateContacts, Foreground),
292    (UpdateDiagnosticSummary, Foreground),
293    (UpdateDiffBase, Foreground),
294    (UpdateFollowers, Foreground),
295    (UpdateInviteInfo, Foreground),
296    (UpdateLanguageServer, Foreground),
297    (UpdateParticipantLocation, Foreground),
298    (UpdateProject, Foreground),
299    (UpdateProjectCollaborator, Foreground),
300    (UpdateWorktree, Foreground),
301    (UpdateWorktreeSettings, Foreground),
302    (UsersResponse, Foreground),
303    (LspExtExpandMacro, Background),
304    (LspExtExpandMacroResponse, Background),
305    (SetRoomParticipantRole, Foreground),
306    (BlameBuffer, Foreground),
307    (BlameBufferResponse, Foreground),
308    (CreateDevServerProject, Background),
309    (CreateDevServerProjectResponse, Foreground),
310    (CreateDevServer, Foreground),
311    (CreateDevServerResponse, Foreground),
312    (DevServerInstructions, Foreground),
313    (ShutdownDevServer, Foreground),
314    (ReconnectDevServer, Foreground),
315    (ReconnectDevServerResponse, Foreground),
316    (ShareDevServerProject, Foreground),
317    (JoinDevServerProject, Foreground),
318    (RejoinRemoteProjects, Foreground),
319    (RejoinRemoteProjectsResponse, Foreground),
320    (MultiLspQuery, Background),
321    (MultiLspQueryResponse, Background),
322    (DevServerProjectsUpdate, Foreground),
323    (ValidateDevServerProjectRequest, Background),
324    (DeleteDevServer, Foreground),
325    (DeleteDevServerProject, Foreground),
326    (OpenNewBuffer, Foreground)
327);
328
329request_messages!(
330    (ApplyCodeAction, ApplyCodeActionResponse),
331    (
332        ApplyCompletionAdditionalEdits,
333        ApplyCompletionAdditionalEditsResponse
334    ),
335    (Call, Ack),
336    (CancelCall, Ack),
337    (CopyProjectEntry, ProjectEntryResponse),
338    (CompleteWithLanguageModel, LanguageModelResponse),
339    (ComputeEmbeddings, ComputeEmbeddingsResponse),
340    (CountTokensWithLanguageModel, CountTokensResponse),
341    (CreateChannel, CreateChannelResponse),
342    (CreateProjectEntry, ProjectEntryResponse),
343    (CreateRoom, CreateRoomResponse),
344    (DeclineCall, Ack),
345    (DeleteChannel, Ack),
346    (DeleteProjectEntry, ProjectEntryResponse),
347    (ExpandProjectEntry, ExpandProjectEntryResponse),
348    (Follow, FollowResponse),
349    (FormatBuffers, FormatBuffersResponse),
350    (FuzzySearchUsers, UsersResponse),
351    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
352    (GetChannelMembers, GetChannelMembersResponse),
353    (GetChannelMessages, GetChannelMessagesResponse),
354    (GetChannelMessagesById, GetChannelMessagesResponse),
355    (GetCodeActions, GetCodeActionsResponse),
356    (GetCompletions, GetCompletionsResponse),
357    (GetDefinition, GetDefinitionResponse),
358    (GetImplementation, GetImplementationResponse),
359    (GetDocumentHighlights, GetDocumentHighlightsResponse),
360    (GetHover, GetHoverResponse),
361    (GetNotifications, GetNotificationsResponse),
362    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
363    (GetProjectSymbols, GetProjectSymbolsResponse),
364    (GetReferences, GetReferencesResponse),
365    (GetSupermavenApiKey, GetSupermavenApiKeyResponse),
366    (GetTypeDefinition, GetTypeDefinitionResponse),
367    (GetUsers, UsersResponse),
368    (IncomingCall, Ack),
369    (InlayHints, InlayHintsResponse),
370    (InviteChannelMember, Ack),
371    (JoinChannel, JoinRoomResponse),
372    (JoinChannelBuffer, JoinChannelBufferResponse),
373    (JoinChannelChat, JoinChannelChatResponse),
374    (JoinHostedProject, JoinProjectResponse),
375    (JoinProject, JoinProjectResponse),
376    (JoinRoom, JoinRoomResponse),
377    (LeaveChannelBuffer, Ack),
378    (LeaveRoom, Ack),
379    (MarkNotificationRead, Ack),
380    (MoveChannel, Ack),
381    (OnTypeFormatting, OnTypeFormattingResponse),
382    (OpenBufferById, OpenBufferResponse),
383    (OpenBufferByPath, OpenBufferResponse),
384    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
385    (OpenNewBuffer, OpenBufferResponse),
386    (PerformRename, PerformRenameResponse),
387    (Ping, Ack),
388    (PrepareRename, PrepareRenameResponse),
389    (RefreshInlayHints, Ack),
390    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
391    (RejoinRoom, RejoinRoomResponse),
392    (ReloadBuffers, ReloadBuffersResponse),
393    (RemoveChannelMember, Ack),
394    (RemoveChannelMessage, Ack),
395    (UpdateChannelMessage, Ack),
396    (RemoveContact, Ack),
397    (RenameChannel, RenameChannelResponse),
398    (RenameProjectEntry, ProjectEntryResponse),
399    (RequestContact, Ack),
400    (
401        ResolveCompletionDocumentation,
402        ResolveCompletionDocumentationResponse
403    ),
404    (ResolveInlayHint, ResolveInlayHintResponse),
405    (RespondToChannelInvite, Ack),
406    (RespondToContactRequest, Ack),
407    (SaveBuffer, BufferSaved),
408    (SearchProject, SearchProjectResponse),
409    (SendChannelMessage, SendChannelMessageResponse),
410    (SetChannelMemberRole, Ack),
411    (SetChannelVisibility, Ack),
412    (ShareProject, ShareProjectResponse),
413    (SynchronizeBuffers, SynchronizeBuffersResponse),
414    (Test, Test),
415    (UpdateBuffer, Ack),
416    (UpdateParticipantLocation, Ack),
417    (UpdateProject, Ack),
418    (UpdateWorktree, Ack),
419    (LspExtExpandMacro, LspExtExpandMacroResponse),
420    (SetRoomParticipantRole, Ack),
421    (BlameBuffer, BlameBufferResponse),
422    (CreateDevServerProject, CreateDevServerProjectResponse),
423    (CreateDevServer, CreateDevServerResponse),
424    (ShutdownDevServer, Ack),
425    (ShareDevServerProject, ShareProjectResponse),
426    (JoinDevServerProject, JoinProjectResponse),
427    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
428    (ReconnectDevServer, ReconnectDevServerResponse),
429    (ValidateDevServerProjectRequest, Ack),
430    (MultiLspQuery, MultiLspQueryResponse),
431    (DeleteDevServer, Ack),
432    (DeleteDevServerProject, Ack),
433);
434
435entity_messages!(
436    {project_id, ShareProject},
437    AddProjectCollaborator,
438    ApplyCodeAction,
439    ApplyCompletionAdditionalEdits,
440    BlameBuffer,
441    BufferReloaded,
442    BufferSaved,
443    CopyProjectEntry,
444    CreateBufferForPeer,
445    CreateProjectEntry,
446    DeleteProjectEntry,
447    ExpandProjectEntry,
448    FormatBuffers,
449    GetCodeActions,
450    GetCompletions,
451    GetDefinition,
452    GetImplementation,
453    GetDocumentHighlights,
454    GetHover,
455    GetProjectSymbols,
456    GetReferences,
457    GetTypeDefinition,
458    InlayHints,
459    JoinProject,
460    LeaveProject,
461    MultiLspQuery,
462    OnTypeFormatting,
463    OpenNewBuffer,
464    OpenBufferById,
465    OpenBufferByPath,
466    OpenBufferForSymbol,
467    PerformRename,
468    PrepareRename,
469    RefreshInlayHints,
470    ReloadBuffers,
471    RemoveProjectCollaborator,
472    RenameProjectEntry,
473    ResolveCompletionDocumentation,
474    ResolveInlayHint,
475    SaveBuffer,
476    SearchProject,
477    StartLanguageServer,
478    SynchronizeBuffers,
479    UnshareProject,
480    UpdateBuffer,
481    UpdateBufferFile,
482    UpdateDiagnosticSummary,
483    UpdateDiffBase,
484    UpdateLanguageServer,
485    UpdateProject,
486    UpdateProjectCollaborator,
487    UpdateWorktree,
488    UpdateWorktreeSettings,
489    LspExtExpandMacro,
490);
491
492entity_messages!(
493    {channel_id, Channel},
494    ChannelMessageSent,
495    ChannelMessageUpdate,
496    RemoveChannelMessage,
497    UpdateChannelMessage,
498    UpdateChannelBuffer,
499    UpdateChannelBufferCollaborators,
500);
501
502const KIB: usize = 1024;
503const MIB: usize = KIB * 1024;
504const MAX_BUFFER_LEN: usize = MIB;
505
506/// A stream of protobuf messages.
507pub struct MessageStream<S> {
508    stream: S,
509    encoding_buffer: Vec<u8>,
510}
511
512#[allow(clippy::large_enum_variant)]
513#[derive(Debug)]
514pub enum Message {
515    Envelope(Envelope),
516    Ping,
517    Pong,
518}
519
520impl<S> MessageStream<S> {
521    pub fn new(stream: S) -> Self {
522        Self {
523            stream,
524            encoding_buffer: Vec::new(),
525        }
526    }
527
528    pub fn inner_mut(&mut self) -> &mut S {
529        &mut self.stream
530    }
531}
532
533impl<S> MessageStream<S>
534where
535    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
536{
537    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
538        #[cfg(any(test, feature = "test-support"))]
539        const COMPRESSION_LEVEL: i32 = -7;
540
541        #[cfg(not(any(test, feature = "test-support")))]
542        const COMPRESSION_LEVEL: i32 = 4;
543
544        match message {
545            Message::Envelope(message) => {
546                self.encoding_buffer.reserve(message.encoded_len());
547                message
548                    .encode(&mut self.encoding_buffer)
549                    .map_err(io::Error::from)?;
550                let buffer =
551                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
552                        .unwrap();
553
554                self.encoding_buffer.clear();
555                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
556                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
557            }
558            Message::Ping => {
559                self.stream
560                    .send(WebSocketMessage::Ping(Default::default()))
561                    .await?;
562            }
563            Message::Pong => {
564                self.stream
565                    .send(WebSocketMessage::Pong(Default::default()))
566                    .await?;
567            }
568        }
569
570        Ok(())
571    }
572}
573
574impl<S> MessageStream<S>
575where
576    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
577{
578    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
579        while let Some(bytes) = self.stream.next().await {
580            let received_at = Instant::now();
581            match bytes? {
582                WebSocketMessage::Binary(bytes) => {
583                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
584                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
585                        .map_err(io::Error::from)?;
586
587                    self.encoding_buffer.clear();
588                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
589                    return Ok((Message::Envelope(envelope), received_at));
590                }
591                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
592                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
593                WebSocketMessage::Close(_) => break,
594                _ => {}
595            }
596        }
597        Err(anyhow!("connection closed"))
598    }
599}
600
601impl From<Timestamp> for SystemTime {
602    fn from(val: Timestamp) -> Self {
603        UNIX_EPOCH
604            .checked_add(Duration::new(val.seconds, val.nanos))
605            .unwrap()
606    }
607}
608
609impl From<SystemTime> for Timestamp {
610    fn from(time: SystemTime) -> Self {
611        let duration = time.duration_since(UNIX_EPOCH).unwrap();
612        Self {
613            seconds: duration.as_secs(),
614            nanos: duration.subsec_nanos(),
615        }
616    }
617}
618
619impl From<u128> for Nonce {
620    fn from(nonce: u128) -> Self {
621        let upper_half = (nonce >> 64) as u64;
622        let lower_half = nonce as u64;
623        Self {
624            upper_half,
625            lower_half,
626        }
627    }
628}
629
630impl From<Nonce> for u128 {
631    fn from(nonce: Nonce) -> Self {
632        let upper_half = (nonce.upper_half as u128) << 64;
633        let lower_half = nonce.lower_half as u128;
634        upper_half | lower_half
635    }
636}
637
638pub fn split_worktree_update(
639    mut message: UpdateWorktree,
640    max_chunk_size: usize,
641) -> impl Iterator<Item = UpdateWorktree> {
642    let mut done_files = false;
643
644    let mut repository_map = message
645        .updated_repositories
646        .into_iter()
647        .map(|repo| (repo.work_directory_id, repo))
648        .collect::<HashMap<_, _>>();
649
650    iter::from_fn(move || {
651        if done_files {
652            return None;
653        }
654
655        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
656        let updated_entries: Vec<_> = message
657            .updated_entries
658            .drain(..updated_entries_chunk_size)
659            .collect();
660
661        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
662        let removed_entries = message
663            .removed_entries
664            .drain(..removed_entries_chunk_size)
665            .collect();
666
667        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
668
669        let mut updated_repositories = Vec::new();
670
671        if !repository_map.is_empty() {
672            for entry in &updated_entries {
673                if let Some(repo) = repository_map.remove(&entry.id) {
674                    updated_repositories.push(repo)
675                }
676            }
677        }
678
679        let removed_repositories = if done_files {
680            mem::take(&mut message.removed_repositories)
681        } else {
682            Default::default()
683        };
684
685        if done_files {
686            updated_repositories.extend(mem::take(&mut repository_map).into_values());
687        }
688
689        Some(UpdateWorktree {
690            project_id: message.project_id,
691            worktree_id: message.worktree_id,
692            root_name: message.root_name.clone(),
693            abs_path: message.abs_path.clone(),
694            updated_entries,
695            removed_entries,
696            scan_id: message.scan_id,
697            is_last_update: done_files && message.is_last_update,
698            updated_repositories,
699            removed_repositories,
700        })
701    })
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707
708    #[gpui::test]
709    async fn test_buffer_size() {
710        let (tx, rx) = futures::channel::mpsc::unbounded();
711        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
712        sink.write(Message::Envelope(Envelope {
713            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
714                root_name: "abcdefg".repeat(10),
715                ..Default::default()
716            })),
717            ..Default::default()
718        }))
719        .await
720        .unwrap();
721        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
722        sink.write(Message::Envelope(Envelope {
723            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
724                root_name: "abcdefg".repeat(1000000),
725                ..Default::default()
726            })),
727            ..Default::default()
728        }))
729        .await
730        .unwrap();
731        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
732
733        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
734        stream.read().await.unwrap();
735        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
736        stream.read().await.unwrap();
737        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
738    }
739
740    #[gpui::test]
741    fn test_converting_peer_id_from_and_to_u64() {
742        let peer_id = PeerId {
743            owner_id: 10,
744            id: 3,
745        };
746        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
747        let peer_id = PeerId {
748            owner_id: u32::MAX,
749            id: 3,
750        };
751        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
752        let peer_id = PeerId {
753            owner_id: 10,
754            id: u32::MAX,
755        };
756        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
757        let peer_id = PeerId {
758            owner_id: u32::MAX,
759            id: u32::MAX,
760        };
761        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
762    }
763}