proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (CopyProjectEntry, Foreground),
155    (CountTokensWithLanguageModel, Background),
156    (CountTokensResponse, Background),
157    (CreateBufferForPeer, Foreground),
158    (CreateChannel, Foreground),
159    (CreateChannelResponse, Foreground),
160    (CreateProjectEntry, Foreground),
161    (CreateRoom, Foreground),
162    (CreateRoomResponse, Foreground),
163    (DeclineCall, Foreground),
164    (DeleteChannel, Foreground),
165    (DeleteNotification, Foreground),
166    (UpdateNotification, Foreground),
167    (DeleteProjectEntry, Foreground),
168    (EndStream, Foreground),
169    (Error, Foreground),
170    (ExpandProjectEntry, Foreground),
171    (ExpandProjectEntryResponse, Foreground),
172    (Follow, Foreground),
173    (FollowResponse, Foreground),
174    (FormatBuffers, Foreground),
175    (FormatBuffersResponse, Foreground),
176    (FuzzySearchUsers, Foreground),
177    (GetChannelMembers, Foreground),
178    (GetChannelMembersResponse, Foreground),
179    (GetChannelMessages, Background),
180    (GetChannelMessagesById, Background),
181    (GetChannelMessagesResponse, Background),
182    (GetCodeActions, Background),
183    (GetCodeActionsResponse, Background),
184    (GetCompletions, Background),
185    (GetCompletionsResponse, Background),
186    (GetDefinition, Background),
187    (GetDefinitionResponse, Background),
188    (GetDocumentHighlights, Background),
189    (GetDocumentHighlightsResponse, Background),
190    (GetHover, Background),
191    (GetHoverResponse, Background),
192    (GetNotifications, Foreground),
193    (GetNotificationsResponse, Foreground),
194    (GetPrivateUserInfo, Foreground),
195    (GetPrivateUserInfoResponse, Foreground),
196    (GetProjectSymbols, Background),
197    (GetProjectSymbolsResponse, Background),
198    (GetReferences, Background),
199    (GetReferencesResponse, Background),
200    (GetTypeDefinition, Background),
201    (GetTypeDefinitionResponse, Background),
202    (GetImplementation, Background),
203    (GetImplementationResponse, Background),
204    (GetUsers, Foreground),
205    (Hello, Foreground),
206    (IncomingCall, Foreground),
207    (InlayHints, Background),
208    (InlayHintsResponse, Background),
209    (InviteChannelMember, Foreground),
210    (JoinChannel, Foreground),
211    (JoinChannelBuffer, Foreground),
212    (JoinChannelBufferResponse, Foreground),
213    (JoinChannelChat, Foreground),
214    (JoinChannelChatResponse, Foreground),
215    (JoinProject, Foreground),
216    (JoinHostedProject, Foreground),
217    (JoinProjectResponse, Foreground),
218    (JoinRoom, Foreground),
219    (JoinRoomResponse, Foreground),
220    (LanguageModelResponse, Background),
221    (LeaveChannelBuffer, Background),
222    (LeaveChannelChat, Foreground),
223    (LeaveProject, Foreground),
224    (LeaveRoom, Foreground),
225    (MarkNotificationRead, Foreground),
226    (MoveChannel, Foreground),
227    (OnTypeFormatting, Background),
228    (OnTypeFormattingResponse, Background),
229    (OpenBufferById, Background),
230    (OpenBufferByPath, Background),
231    (OpenBufferForSymbol, Background),
232    (OpenBufferForSymbolResponse, Background),
233    (OpenBufferResponse, Background),
234    (PerformRename, Background),
235    (PerformRenameResponse, Background),
236    (Ping, Foreground),
237    (PrepareRename, Background),
238    (PrepareRenameResponse, Background),
239    (ProjectEntryResponse, Foreground),
240    (RefreshInlayHints, Foreground),
241    (RejoinChannelBuffers, Foreground),
242    (RejoinChannelBuffersResponse, Foreground),
243    (RejoinRoom, Foreground),
244    (RejoinRoomResponse, Foreground),
245    (ReloadBuffers, Foreground),
246    (ReloadBuffersResponse, Foreground),
247    (RemoveChannelMember, Foreground),
248    (RemoveChannelMessage, Foreground),
249    (UpdateChannelMessage, Foreground),
250    (RemoveContact, Foreground),
251    (RemoveProjectCollaborator, Foreground),
252    (RenameChannel, Foreground),
253    (RenameChannelResponse, Foreground),
254    (RenameProjectEntry, Foreground),
255    (RequestContact, Foreground),
256    (ResolveCompletionDocumentation, Background),
257    (ResolveCompletionDocumentationResponse, Background),
258    (ResolveInlayHint, Background),
259    (ResolveInlayHintResponse, Background),
260    (RespondToChannelInvite, Foreground),
261    (RespondToContactRequest, Foreground),
262    (RoomUpdated, Foreground),
263    (SaveBuffer, Foreground),
264    (SetChannelMemberRole, Foreground),
265    (SetChannelVisibility, Foreground),
266    (SearchProject, Background),
267    (SearchProjectResponse, Background),
268    (SendChannelMessage, Background),
269    (SendChannelMessageResponse, Background),
270    (ShareProject, Foreground),
271    (ShareProjectResponse, Foreground),
272    (ShowContacts, Foreground),
273    (StartLanguageServer, Foreground),
274    (SynchronizeBuffers, Foreground),
275    (SynchronizeBuffersResponse, Foreground),
276    (Test, Foreground),
277    (Unfollow, Foreground),
278    (UnshareProject, Foreground),
279    (UpdateBuffer, Foreground),
280    (UpdateBufferFile, Foreground),
281    (UpdateChannelBuffer, Foreground),
282    (UpdateChannelBufferCollaborators, Foreground),
283    (UpdateChannels, Foreground),
284    (UpdateUserChannels, Foreground),
285    (UpdateContacts, Foreground),
286    (UpdateDiagnosticSummary, Foreground),
287    (UpdateDiffBase, Foreground),
288    (UpdateFollowers, Foreground),
289    (UpdateInviteInfo, Foreground),
290    (UpdateLanguageServer, Foreground),
291    (UpdateParticipantLocation, Foreground),
292    (UpdateProject, Foreground),
293    (UpdateProjectCollaborator, Foreground),
294    (UpdateWorktree, Foreground),
295    (UpdateWorktreeSettings, Foreground),
296    (UsersResponse, Foreground),
297    (LspExtExpandMacro, Background),
298    (LspExtExpandMacroResponse, Background),
299    (SetRoomParticipantRole, Foreground),
300    (BlameBuffer, Foreground),
301    (BlameBufferResponse, Foreground),
302    (MultiLspQuery, Background),
303    (MultiLspQueryResponse, Background),
304);
305
306request_messages!(
307    (ApplyCodeAction, ApplyCodeActionResponse),
308    (
309        ApplyCompletionAdditionalEdits,
310        ApplyCompletionAdditionalEditsResponse
311    ),
312    (Call, Ack),
313    (CancelCall, Ack),
314    (CopyProjectEntry, ProjectEntryResponse),
315    (CompleteWithLanguageModel, LanguageModelResponse),
316    (CountTokensWithLanguageModel, CountTokensResponse),
317    (CreateChannel, CreateChannelResponse),
318    (CreateProjectEntry, ProjectEntryResponse),
319    (CreateRoom, CreateRoomResponse),
320    (DeclineCall, Ack),
321    (DeleteChannel, Ack),
322    (DeleteProjectEntry, ProjectEntryResponse),
323    (ExpandProjectEntry, ExpandProjectEntryResponse),
324    (Follow, FollowResponse),
325    (FormatBuffers, FormatBuffersResponse),
326    (FuzzySearchUsers, UsersResponse),
327    (GetChannelMembers, GetChannelMembersResponse),
328    (GetChannelMessages, GetChannelMessagesResponse),
329    (GetChannelMessagesById, GetChannelMessagesResponse),
330    (GetCodeActions, GetCodeActionsResponse),
331    (GetCompletions, GetCompletionsResponse),
332    (GetDefinition, GetDefinitionResponse),
333    (GetImplementation, GetImplementationResponse),
334    (GetDocumentHighlights, GetDocumentHighlightsResponse),
335    (GetHover, GetHoverResponse),
336    (GetNotifications, GetNotificationsResponse),
337    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
338    (GetProjectSymbols, GetProjectSymbolsResponse),
339    (GetReferences, GetReferencesResponse),
340    (GetTypeDefinition, GetTypeDefinitionResponse),
341    (GetUsers, UsersResponse),
342    (IncomingCall, Ack),
343    (InlayHints, InlayHintsResponse),
344    (InviteChannelMember, Ack),
345    (JoinChannel, JoinRoomResponse),
346    (JoinChannelBuffer, JoinChannelBufferResponse),
347    (JoinChannelChat, JoinChannelChatResponse),
348    (JoinHostedProject, JoinProjectResponse),
349    (JoinProject, JoinProjectResponse),
350    (JoinRoom, JoinRoomResponse),
351    (LeaveChannelBuffer, Ack),
352    (LeaveRoom, Ack),
353    (MarkNotificationRead, Ack),
354    (MoveChannel, Ack),
355    (OnTypeFormatting, OnTypeFormattingResponse),
356    (OpenBufferById, OpenBufferResponse),
357    (OpenBufferByPath, OpenBufferResponse),
358    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
359    (PerformRename, PerformRenameResponse),
360    (Ping, Ack),
361    (PrepareRename, PrepareRenameResponse),
362    (RefreshInlayHints, Ack),
363    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
364    (RejoinRoom, RejoinRoomResponse),
365    (ReloadBuffers, ReloadBuffersResponse),
366    (RemoveChannelMember, Ack),
367    (RemoveChannelMessage, Ack),
368    (UpdateChannelMessage, Ack),
369    (RemoveContact, Ack),
370    (RenameChannel, RenameChannelResponse),
371    (RenameProjectEntry, ProjectEntryResponse),
372    (RequestContact, Ack),
373    (
374        ResolveCompletionDocumentation,
375        ResolveCompletionDocumentationResponse
376    ),
377    (ResolveInlayHint, ResolveInlayHintResponse),
378    (RespondToChannelInvite, Ack),
379    (RespondToContactRequest, Ack),
380    (SaveBuffer, BufferSaved),
381    (SearchProject, SearchProjectResponse),
382    (SendChannelMessage, SendChannelMessageResponse),
383    (SetChannelMemberRole, Ack),
384    (SetChannelVisibility, Ack),
385    (ShareProject, ShareProjectResponse),
386    (SynchronizeBuffers, SynchronizeBuffersResponse),
387    (Test, Test),
388    (UpdateBuffer, Ack),
389    (UpdateParticipantLocation, Ack),
390    (UpdateProject, Ack),
391    (UpdateWorktree, Ack),
392    (LspExtExpandMacro, LspExtExpandMacroResponse),
393    (SetRoomParticipantRole, Ack),
394    (BlameBuffer, BlameBufferResponse),
395    (MultiLspQuery, MultiLspQueryResponse),
396);
397
398entity_messages!(
399    {project_id, ShareProject},
400    AddProjectCollaborator,
401    ApplyCodeAction,
402    ApplyCompletionAdditionalEdits,
403    BlameBuffer,
404    BufferReloaded,
405    BufferSaved,
406    CopyProjectEntry,
407    CreateBufferForPeer,
408    CreateProjectEntry,
409    DeleteProjectEntry,
410    ExpandProjectEntry,
411    FormatBuffers,
412    GetCodeActions,
413    GetCompletions,
414    GetDefinition,
415    GetImplementation,
416    GetDocumentHighlights,
417    GetHover,
418    GetProjectSymbols,
419    GetReferences,
420    GetTypeDefinition,
421    InlayHints,
422    JoinProject,
423    LeaveProject,
424    MultiLspQuery,
425    OnTypeFormatting,
426    OpenBufferById,
427    OpenBufferByPath,
428    OpenBufferForSymbol,
429    PerformRename,
430    PrepareRename,
431    RefreshInlayHints,
432    ReloadBuffers,
433    RemoveProjectCollaborator,
434    RenameProjectEntry,
435    ResolveCompletionDocumentation,
436    ResolveInlayHint,
437    SaveBuffer,
438    SearchProject,
439    StartLanguageServer,
440    SynchronizeBuffers,
441    UnshareProject,
442    UpdateBuffer,
443    UpdateBufferFile,
444    UpdateDiagnosticSummary,
445    UpdateDiffBase,
446    UpdateLanguageServer,
447    UpdateProject,
448    UpdateProjectCollaborator,
449    UpdateWorktree,
450    UpdateWorktreeSettings,
451    LspExtExpandMacro,
452);
453
454entity_messages!(
455    {channel_id, Channel},
456    ChannelMessageSent,
457    ChannelMessageUpdate,
458    RemoveChannelMessage,
459    UpdateChannelMessage,
460    UpdateChannelBuffer,
461    UpdateChannelBufferCollaborators,
462);
463
464const KIB: usize = 1024;
465const MIB: usize = KIB * 1024;
466const MAX_BUFFER_LEN: usize = MIB;
467
468/// A stream of protobuf messages.
469pub struct MessageStream<S> {
470    stream: S,
471    encoding_buffer: Vec<u8>,
472}
473
474#[allow(clippy::large_enum_variant)]
475#[derive(Debug)]
476pub enum Message {
477    Envelope(Envelope),
478    Ping,
479    Pong,
480}
481
482impl<S> MessageStream<S> {
483    pub fn new(stream: S) -> Self {
484        Self {
485            stream,
486            encoding_buffer: Vec::new(),
487        }
488    }
489
490    pub fn inner_mut(&mut self) -> &mut S {
491        &mut self.stream
492    }
493}
494
495impl<S> MessageStream<S>
496where
497    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
498{
499    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
500        #[cfg(any(test, feature = "test-support"))]
501        const COMPRESSION_LEVEL: i32 = -7;
502
503        #[cfg(not(any(test, feature = "test-support")))]
504        const COMPRESSION_LEVEL: i32 = 4;
505
506        match message {
507            Message::Envelope(message) => {
508                self.encoding_buffer.reserve(message.encoded_len());
509                message
510                    .encode(&mut self.encoding_buffer)
511                    .map_err(io::Error::from)?;
512                let buffer =
513                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
514                        .unwrap();
515
516                self.encoding_buffer.clear();
517                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
518                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
519            }
520            Message::Ping => {
521                self.stream
522                    .send(WebSocketMessage::Ping(Default::default()))
523                    .await?;
524            }
525            Message::Pong => {
526                self.stream
527                    .send(WebSocketMessage::Pong(Default::default()))
528                    .await?;
529            }
530        }
531
532        Ok(())
533    }
534}
535
536impl<S> MessageStream<S>
537where
538    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
539{
540    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
541        while let Some(bytes) = self.stream.next().await {
542            let received_at = Instant::now();
543            match bytes? {
544                WebSocketMessage::Binary(bytes) => {
545                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
546                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
547                        .map_err(io::Error::from)?;
548
549                    self.encoding_buffer.clear();
550                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
551                    return Ok((Message::Envelope(envelope), received_at));
552                }
553                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
554                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
555                WebSocketMessage::Close(_) => break,
556                _ => {}
557            }
558        }
559        Err(anyhow!("connection closed"))
560    }
561}
562
563impl From<Timestamp> for SystemTime {
564    fn from(val: Timestamp) -> Self {
565        UNIX_EPOCH
566            .checked_add(Duration::new(val.seconds, val.nanos))
567            .unwrap()
568    }
569}
570
571impl From<SystemTime> for Timestamp {
572    fn from(time: SystemTime) -> Self {
573        let duration = time.duration_since(UNIX_EPOCH).unwrap();
574        Self {
575            seconds: duration.as_secs(),
576            nanos: duration.subsec_nanos(),
577        }
578    }
579}
580
581impl From<u128> for Nonce {
582    fn from(nonce: u128) -> Self {
583        let upper_half = (nonce >> 64) as u64;
584        let lower_half = nonce as u64;
585        Self {
586            upper_half,
587            lower_half,
588        }
589    }
590}
591
592impl From<Nonce> for u128 {
593    fn from(nonce: Nonce) -> Self {
594        let upper_half = (nonce.upper_half as u128) << 64;
595        let lower_half = nonce.lower_half as u128;
596        upper_half | lower_half
597    }
598}
599
600pub fn split_worktree_update(
601    mut message: UpdateWorktree,
602    max_chunk_size: usize,
603) -> impl Iterator<Item = UpdateWorktree> {
604    let mut done_files = false;
605
606    let mut repository_map = message
607        .updated_repositories
608        .into_iter()
609        .map(|repo| (repo.work_directory_id, repo))
610        .collect::<HashMap<_, _>>();
611
612    iter::from_fn(move || {
613        if done_files {
614            return None;
615        }
616
617        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
618        let updated_entries: Vec<_> = message
619            .updated_entries
620            .drain(..updated_entries_chunk_size)
621            .collect();
622
623        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
624        let removed_entries = message
625            .removed_entries
626            .drain(..removed_entries_chunk_size)
627            .collect();
628
629        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
630
631        let mut updated_repositories = Vec::new();
632
633        if !repository_map.is_empty() {
634            for entry in &updated_entries {
635                if let Some(repo) = repository_map.remove(&entry.id) {
636                    updated_repositories.push(repo)
637                }
638            }
639        }
640
641        let removed_repositories = if done_files {
642            mem::take(&mut message.removed_repositories)
643        } else {
644            Default::default()
645        };
646
647        if done_files {
648            updated_repositories.extend(mem::take(&mut repository_map).into_values());
649        }
650
651        Some(UpdateWorktree {
652            project_id: message.project_id,
653            worktree_id: message.worktree_id,
654            root_name: message.root_name.clone(),
655            abs_path: message.abs_path.clone(),
656            updated_entries,
657            removed_entries,
658            scan_id: message.scan_id,
659            is_last_update: done_files && message.is_last_update,
660            updated_repositories,
661            removed_repositories,
662        })
663    })
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    #[gpui::test]
671    async fn test_buffer_size() {
672        let (tx, rx) = futures::channel::mpsc::unbounded();
673        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
674        sink.write(Message::Envelope(Envelope {
675            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
676                root_name: "abcdefg".repeat(10),
677                ..Default::default()
678            })),
679            ..Default::default()
680        }))
681        .await
682        .unwrap();
683        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
684        sink.write(Message::Envelope(Envelope {
685            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
686                root_name: "abcdefg".repeat(1000000),
687                ..Default::default()
688            })),
689            ..Default::default()
690        }))
691        .await
692        .unwrap();
693        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
694
695        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
696        stream.read().await.unwrap();
697        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
698        stream.read().await.unwrap();
699        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
700    }
701
702    #[gpui::test]
703    fn test_converting_peer_id_from_and_to_u64() {
704        let peer_id = PeerId {
705            owner_id: 10,
706            id: 3,
707        };
708        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
709        let peer_id = PeerId {
710            owner_id: u32::MAX,
711            id: 3,
712        };
713        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
714        let peer_id = PeerId {
715            owner_id: 10,
716            id: u32::MAX,
717        };
718        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
719        let peer_id = PeerId {
720            owner_id: u32::MAX,
721            id: u32::MAX,
722        };
723        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
724    }
725}