proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (ComputeEmbeddings, Background),
155    (ComputeEmbeddingsResponse, Background),
156    (CopyProjectEntry, Foreground),
157    (CountTokensWithLanguageModel, Background),
158    (CountTokensResponse, Background),
159    (CreateBufferForPeer, Foreground),
160    (CreateChannel, Foreground),
161    (CreateChannelResponse, Foreground),
162    (CreateProjectEntry, Foreground),
163    (CreateRoom, Foreground),
164    (CreateRoomResponse, Foreground),
165    (DeclineCall, Foreground),
166    (DeleteChannel, Foreground),
167    (DeleteNotification, Foreground),
168    (UpdateNotification, Foreground),
169    (DeleteProjectEntry, Foreground),
170    (EndStream, Foreground),
171    (Error, Foreground),
172    (ExpandProjectEntry, Foreground),
173    (ExpandProjectEntryResponse, Foreground),
174    (Follow, Foreground),
175    (FollowResponse, Foreground),
176    (FormatBuffers, Foreground),
177    (FormatBuffersResponse, Foreground),
178    (FuzzySearchUsers, Foreground),
179    (GetCachedEmbeddings, Background),
180    (GetCachedEmbeddingsResponse, Background),
181    (GetChannelMembers, Foreground),
182    (GetChannelMembersResponse, Foreground),
183    (GetChannelMessages, Background),
184    (GetChannelMessagesById, Background),
185    (GetChannelMessagesResponse, Background),
186    (GetCodeActions, Background),
187    (GetCodeActionsResponse, Background),
188    (GetCompletions, Background),
189    (GetCompletionsResponse, Background),
190    (GetDefinition, Background),
191    (GetDefinitionResponse, Background),
192    (GetDocumentHighlights, Background),
193    (GetDocumentHighlightsResponse, Background),
194    (GetHover, Background),
195    (GetHoverResponse, Background),
196    (GetNotifications, Foreground),
197    (GetNotificationsResponse, Foreground),
198    (GetPrivateUserInfo, Foreground),
199    (GetPrivateUserInfoResponse, Foreground),
200    (GetProjectSymbols, Background),
201    (GetProjectSymbolsResponse, Background),
202    (GetReferences, Background),
203    (GetReferencesResponse, Background),
204    (GetTypeDefinition, Background),
205    (GetTypeDefinitionResponse, Background),
206    (GetImplementation, Background),
207    (GetImplementationResponse, Background),
208    (GetUsers, Foreground),
209    (Hello, Foreground),
210    (IncomingCall, Foreground),
211    (InlayHints, Background),
212    (InlayHintsResponse, Background),
213    (InviteChannelMember, Foreground),
214    (JoinChannel, Foreground),
215    (JoinChannelBuffer, Foreground),
216    (JoinChannelBufferResponse, Foreground),
217    (JoinChannelChat, Foreground),
218    (JoinChannelChatResponse, Foreground),
219    (JoinProject, Foreground),
220    (JoinHostedProject, Foreground),
221    (JoinProjectResponse, Foreground),
222    (JoinRoom, Foreground),
223    (JoinRoomResponse, Foreground),
224    (LanguageModelResponse, Background),
225    (LeaveChannelBuffer, Background),
226    (LeaveChannelChat, Foreground),
227    (LeaveProject, Foreground),
228    (LeaveRoom, Foreground),
229    (MarkNotificationRead, Foreground),
230    (MoveChannel, Foreground),
231    (OnTypeFormatting, Background),
232    (OnTypeFormattingResponse, Background),
233    (OpenBufferById, Background),
234    (OpenBufferByPath, Background),
235    (OpenBufferForSymbol, Background),
236    (OpenBufferForSymbolResponse, Background),
237    (OpenBufferResponse, Background),
238    (PerformRename, Background),
239    (PerformRenameResponse, Background),
240    (Ping, Foreground),
241    (PrepareRename, Background),
242    (PrepareRenameResponse, Background),
243    (ProjectEntryResponse, Foreground),
244    (RefreshInlayHints, Foreground),
245    (RejoinChannelBuffers, Foreground),
246    (RejoinChannelBuffersResponse, Foreground),
247    (RejoinRoom, Foreground),
248    (RejoinRoomResponse, Foreground),
249    (ReloadBuffers, Foreground),
250    (ReloadBuffersResponse, Foreground),
251    (RemoveChannelMember, Foreground),
252    (RemoveChannelMessage, Foreground),
253    (UpdateChannelMessage, Foreground),
254    (RemoveContact, Foreground),
255    (RemoveProjectCollaborator, Foreground),
256    (RenameChannel, Foreground),
257    (RenameChannelResponse, Foreground),
258    (RenameProjectEntry, Foreground),
259    (RequestContact, Foreground),
260    (ResolveCompletionDocumentation, Background),
261    (ResolveCompletionDocumentationResponse, Background),
262    (ResolveInlayHint, Background),
263    (ResolveInlayHintResponse, Background),
264    (RespondToChannelInvite, Foreground),
265    (RespondToContactRequest, Foreground),
266    (RoomUpdated, Foreground),
267    (SaveBuffer, Foreground),
268    (SetChannelMemberRole, Foreground),
269    (SetChannelVisibility, Foreground),
270    (SearchProject, Background),
271    (SearchProjectResponse, Background),
272    (SendChannelMessage, Background),
273    (SendChannelMessageResponse, Background),
274    (ShareProject, Foreground),
275    (ShareProjectResponse, Foreground),
276    (ShowContacts, Foreground),
277    (StartLanguageServer, Foreground),
278    (SynchronizeBuffers, Foreground),
279    (SynchronizeBuffersResponse, Foreground),
280    (Test, Foreground),
281    (Unfollow, Foreground),
282    (UnshareProject, Foreground),
283    (UpdateBuffer, Foreground),
284    (UpdateBufferFile, Foreground),
285    (UpdateChannelBuffer, Foreground),
286    (UpdateChannelBufferCollaborators, Foreground),
287    (UpdateChannels, Foreground),
288    (UpdateUserChannels, Foreground),
289    (UpdateContacts, Foreground),
290    (UpdateDiagnosticSummary, Foreground),
291    (UpdateDiffBase, Foreground),
292    (UpdateFollowers, Foreground),
293    (UpdateInviteInfo, Foreground),
294    (UpdateLanguageServer, Foreground),
295    (UpdateParticipantLocation, Foreground),
296    (UpdateProject, Foreground),
297    (UpdateProjectCollaborator, Foreground),
298    (UpdateWorktree, Foreground),
299    (UpdateWorktreeSettings, Foreground),
300    (UsersResponse, Foreground),
301    (LspExtExpandMacro, Background),
302    (LspExtExpandMacroResponse, Background),
303    (SetRoomParticipantRole, Foreground),
304    (BlameBuffer, Foreground),
305    (BlameBufferResponse, Foreground),
306    (CreateRemoteProject, Foreground),
307    (CreateRemoteProjectResponse, Foreground),
308    (CreateDevServer, Foreground),
309    (CreateDevServerResponse, Foreground),
310    (DevServerInstructions, Foreground),
311    (ShutdownDevServer, Foreground),
312    (ReconnectDevServer, Foreground),
313    (ReconnectDevServerResponse, Foreground),
314    (ShareRemoteProject, Foreground),
315    (JoinRemoteProject, Foreground),
316    (RejoinRemoteProjects, Foreground),
317    (RejoinRemoteProjectsResponse, Foreground),
318    (MultiLspQuery, Background),
319    (MultiLspQueryResponse, Background),
320);
321
322request_messages!(
323    (ApplyCodeAction, ApplyCodeActionResponse),
324    (
325        ApplyCompletionAdditionalEdits,
326        ApplyCompletionAdditionalEditsResponse
327    ),
328    (Call, Ack),
329    (CancelCall, Ack),
330    (CopyProjectEntry, ProjectEntryResponse),
331    (CompleteWithLanguageModel, LanguageModelResponse),
332    (ComputeEmbeddings, ComputeEmbeddingsResponse),
333    (CountTokensWithLanguageModel, CountTokensResponse),
334    (CreateChannel, CreateChannelResponse),
335    (CreateProjectEntry, ProjectEntryResponse),
336    (CreateRoom, CreateRoomResponse),
337    (DeclineCall, Ack),
338    (DeleteChannel, Ack),
339    (DeleteProjectEntry, ProjectEntryResponse),
340    (ExpandProjectEntry, ExpandProjectEntryResponse),
341    (Follow, FollowResponse),
342    (FormatBuffers, FormatBuffersResponse),
343    (FuzzySearchUsers, UsersResponse),
344    (GetCachedEmbeddings, GetCachedEmbeddingsResponse),
345    (GetChannelMembers, GetChannelMembersResponse),
346    (GetChannelMessages, GetChannelMessagesResponse),
347    (GetChannelMessagesById, GetChannelMessagesResponse),
348    (GetCodeActions, GetCodeActionsResponse),
349    (GetCompletions, GetCompletionsResponse),
350    (GetDefinition, GetDefinitionResponse),
351    (GetImplementation, GetImplementationResponse),
352    (GetDocumentHighlights, GetDocumentHighlightsResponse),
353    (GetHover, GetHoverResponse),
354    (GetNotifications, GetNotificationsResponse),
355    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
356    (GetProjectSymbols, GetProjectSymbolsResponse),
357    (GetReferences, GetReferencesResponse),
358    (GetTypeDefinition, GetTypeDefinitionResponse),
359    (GetUsers, UsersResponse),
360    (IncomingCall, Ack),
361    (InlayHints, InlayHintsResponse),
362    (InviteChannelMember, Ack),
363    (JoinChannel, JoinRoomResponse),
364    (JoinChannelBuffer, JoinChannelBufferResponse),
365    (JoinChannelChat, JoinChannelChatResponse),
366    (JoinHostedProject, JoinProjectResponse),
367    (JoinProject, JoinProjectResponse),
368    (JoinRoom, JoinRoomResponse),
369    (LeaveChannelBuffer, Ack),
370    (LeaveRoom, Ack),
371    (MarkNotificationRead, Ack),
372    (MoveChannel, Ack),
373    (OnTypeFormatting, OnTypeFormattingResponse),
374    (OpenBufferById, OpenBufferResponse),
375    (OpenBufferByPath, OpenBufferResponse),
376    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
377    (PerformRename, PerformRenameResponse),
378    (Ping, Ack),
379    (PrepareRename, PrepareRenameResponse),
380    (RefreshInlayHints, Ack),
381    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
382    (RejoinRoom, RejoinRoomResponse),
383    (ReloadBuffers, ReloadBuffersResponse),
384    (RemoveChannelMember, Ack),
385    (RemoveChannelMessage, Ack),
386    (UpdateChannelMessage, Ack),
387    (RemoveContact, Ack),
388    (RenameChannel, RenameChannelResponse),
389    (RenameProjectEntry, ProjectEntryResponse),
390    (RequestContact, Ack),
391    (
392        ResolveCompletionDocumentation,
393        ResolveCompletionDocumentationResponse
394    ),
395    (ResolveInlayHint, ResolveInlayHintResponse),
396    (RespondToChannelInvite, Ack),
397    (RespondToContactRequest, Ack),
398    (SaveBuffer, BufferSaved),
399    (SearchProject, SearchProjectResponse),
400    (SendChannelMessage, SendChannelMessageResponse),
401    (SetChannelMemberRole, Ack),
402    (SetChannelVisibility, Ack),
403    (ShareProject, ShareProjectResponse),
404    (SynchronizeBuffers, SynchronizeBuffersResponse),
405    (Test, Test),
406    (UpdateBuffer, Ack),
407    (UpdateParticipantLocation, Ack),
408    (UpdateProject, Ack),
409    (UpdateWorktree, Ack),
410    (LspExtExpandMacro, LspExtExpandMacroResponse),
411    (SetRoomParticipantRole, Ack),
412    (BlameBuffer, BlameBufferResponse),
413    (CreateRemoteProject, CreateRemoteProjectResponse),
414    (CreateDevServer, CreateDevServerResponse),
415    (ShutdownDevServer, Ack),
416    (ShareRemoteProject, ShareProjectResponse),
417    (JoinRemoteProject, JoinProjectResponse),
418    (RejoinRemoteProjects, RejoinRemoteProjectsResponse),
419    (ReconnectDevServer, ReconnectDevServerResponse),
420    (MultiLspQuery, MultiLspQueryResponse),
421);
422
423entity_messages!(
424    {project_id, ShareProject},
425    AddProjectCollaborator,
426    ApplyCodeAction,
427    ApplyCompletionAdditionalEdits,
428    BlameBuffer,
429    BufferReloaded,
430    BufferSaved,
431    CopyProjectEntry,
432    CreateBufferForPeer,
433    CreateProjectEntry,
434    DeleteProjectEntry,
435    ExpandProjectEntry,
436    FormatBuffers,
437    GetCodeActions,
438    GetCompletions,
439    GetDefinition,
440    GetImplementation,
441    GetDocumentHighlights,
442    GetHover,
443    GetProjectSymbols,
444    GetReferences,
445    GetTypeDefinition,
446    InlayHints,
447    JoinProject,
448    LeaveProject,
449    MultiLspQuery,
450    OnTypeFormatting,
451    OpenBufferById,
452    OpenBufferByPath,
453    OpenBufferForSymbol,
454    PerformRename,
455    PrepareRename,
456    RefreshInlayHints,
457    ReloadBuffers,
458    RemoveProjectCollaborator,
459    RenameProjectEntry,
460    ResolveCompletionDocumentation,
461    ResolveInlayHint,
462    SaveBuffer,
463    SearchProject,
464    StartLanguageServer,
465    SynchronizeBuffers,
466    UnshareProject,
467    UpdateBuffer,
468    UpdateBufferFile,
469    UpdateDiagnosticSummary,
470    UpdateDiffBase,
471    UpdateLanguageServer,
472    UpdateProject,
473    UpdateProjectCollaborator,
474    UpdateWorktree,
475    UpdateWorktreeSettings,
476    LspExtExpandMacro,
477);
478
479entity_messages!(
480    {channel_id, Channel},
481    ChannelMessageSent,
482    ChannelMessageUpdate,
483    RemoveChannelMessage,
484    UpdateChannelMessage,
485    UpdateChannelBuffer,
486    UpdateChannelBufferCollaborators,
487);
488
489const KIB: usize = 1024;
490const MIB: usize = KIB * 1024;
491const MAX_BUFFER_LEN: usize = MIB;
492
493/// A stream of protobuf messages.
494pub struct MessageStream<S> {
495    stream: S,
496    encoding_buffer: Vec<u8>,
497}
498
499#[allow(clippy::large_enum_variant)]
500#[derive(Debug)]
501pub enum Message {
502    Envelope(Envelope),
503    Ping,
504    Pong,
505}
506
507impl<S> MessageStream<S> {
508    pub fn new(stream: S) -> Self {
509        Self {
510            stream,
511            encoding_buffer: Vec::new(),
512        }
513    }
514
515    pub fn inner_mut(&mut self) -> &mut S {
516        &mut self.stream
517    }
518}
519
520impl<S> MessageStream<S>
521where
522    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
523{
524    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
525        #[cfg(any(test, feature = "test-support"))]
526        const COMPRESSION_LEVEL: i32 = -7;
527
528        #[cfg(not(any(test, feature = "test-support")))]
529        const COMPRESSION_LEVEL: i32 = 4;
530
531        match message {
532            Message::Envelope(message) => {
533                self.encoding_buffer.reserve(message.encoded_len());
534                message
535                    .encode(&mut self.encoding_buffer)
536                    .map_err(io::Error::from)?;
537                let buffer =
538                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
539                        .unwrap();
540
541                self.encoding_buffer.clear();
542                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
543                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
544            }
545            Message::Ping => {
546                self.stream
547                    .send(WebSocketMessage::Ping(Default::default()))
548                    .await?;
549            }
550            Message::Pong => {
551                self.stream
552                    .send(WebSocketMessage::Pong(Default::default()))
553                    .await?;
554            }
555        }
556
557        Ok(())
558    }
559}
560
561impl<S> MessageStream<S>
562where
563    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
564{
565    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
566        while let Some(bytes) = self.stream.next().await {
567            let received_at = Instant::now();
568            match bytes? {
569                WebSocketMessage::Binary(bytes) => {
570                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
571                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
572                        .map_err(io::Error::from)?;
573
574                    self.encoding_buffer.clear();
575                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
576                    return Ok((Message::Envelope(envelope), received_at));
577                }
578                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
579                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
580                WebSocketMessage::Close(_) => break,
581                _ => {}
582            }
583        }
584        Err(anyhow!("connection closed"))
585    }
586}
587
588impl From<Timestamp> for SystemTime {
589    fn from(val: Timestamp) -> Self {
590        UNIX_EPOCH
591            .checked_add(Duration::new(val.seconds, val.nanos))
592            .unwrap()
593    }
594}
595
596impl From<SystemTime> for Timestamp {
597    fn from(time: SystemTime) -> Self {
598        let duration = time.duration_since(UNIX_EPOCH).unwrap();
599        Self {
600            seconds: duration.as_secs(),
601            nanos: duration.subsec_nanos(),
602        }
603    }
604}
605
606impl From<u128> for Nonce {
607    fn from(nonce: u128) -> Self {
608        let upper_half = (nonce >> 64) as u64;
609        let lower_half = nonce as u64;
610        Self {
611            upper_half,
612            lower_half,
613        }
614    }
615}
616
617impl From<Nonce> for u128 {
618    fn from(nonce: Nonce) -> Self {
619        let upper_half = (nonce.upper_half as u128) << 64;
620        let lower_half = nonce.lower_half as u128;
621        upper_half | lower_half
622    }
623}
624
625pub fn split_worktree_update(
626    mut message: UpdateWorktree,
627    max_chunk_size: usize,
628) -> impl Iterator<Item = UpdateWorktree> {
629    let mut done_files = false;
630
631    let mut repository_map = message
632        .updated_repositories
633        .into_iter()
634        .map(|repo| (repo.work_directory_id, repo))
635        .collect::<HashMap<_, _>>();
636
637    iter::from_fn(move || {
638        if done_files {
639            return None;
640        }
641
642        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
643        let updated_entries: Vec<_> = message
644            .updated_entries
645            .drain(..updated_entries_chunk_size)
646            .collect();
647
648        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
649        let removed_entries = message
650            .removed_entries
651            .drain(..removed_entries_chunk_size)
652            .collect();
653
654        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
655
656        let mut updated_repositories = Vec::new();
657
658        if !repository_map.is_empty() {
659            for entry in &updated_entries {
660                if let Some(repo) = repository_map.remove(&entry.id) {
661                    updated_repositories.push(repo)
662                }
663            }
664        }
665
666        let removed_repositories = if done_files {
667            mem::take(&mut message.removed_repositories)
668        } else {
669            Default::default()
670        };
671
672        if done_files {
673            updated_repositories.extend(mem::take(&mut repository_map).into_values());
674        }
675
676        Some(UpdateWorktree {
677            project_id: message.project_id,
678            worktree_id: message.worktree_id,
679            root_name: message.root_name.clone(),
680            abs_path: message.abs_path.clone(),
681            updated_entries,
682            removed_entries,
683            scan_id: message.scan_id,
684            is_last_update: done_files && message.is_last_update,
685            updated_repositories,
686            removed_repositories,
687        })
688    })
689}
690
691#[cfg(test)]
692mod tests {
693    use super::*;
694
695    #[gpui::test]
696    async fn test_buffer_size() {
697        let (tx, rx) = futures::channel::mpsc::unbounded();
698        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
699        sink.write(Message::Envelope(Envelope {
700            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
701                root_name: "abcdefg".repeat(10),
702                ..Default::default()
703            })),
704            ..Default::default()
705        }))
706        .await
707        .unwrap();
708        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
709        sink.write(Message::Envelope(Envelope {
710            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
711                root_name: "abcdefg".repeat(1000000),
712                ..Default::default()
713            })),
714            ..Default::default()
715        }))
716        .await
717        .unwrap();
718        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
719
720        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
721        stream.read().await.unwrap();
722        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
723        stream.read().await.unwrap();
724        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
725    }
726
727    #[gpui::test]
728    fn test_converting_peer_id_from_and_to_u64() {
729        let peer_id = PeerId {
730            owner_id: 10,
731            id: 3,
732        };
733        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
734        let peer_id = PeerId {
735            owner_id: u32::MAX,
736            id: 3,
737        };
738        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
739        let peer_id = PeerId {
740            owner_id: 10,
741            id: u32::MAX,
742        };
743        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
744        let peer_id = PeerId {
745            owner_id: u32::MAX,
746            id: u32::MAX,
747        };
748        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
749    }
750}