proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (CopyProjectEntry, Foreground),
155    (CountTokensWithLanguageModel, Background),
156    (CountTokensResponse, Background),
157    (CreateBufferForPeer, Foreground),
158    (CreateChannel, Foreground),
159    (CreateChannelResponse, Foreground),
160    (CreateProjectEntry, Foreground),
161    (CreateRoom, Foreground),
162    (CreateRoomResponse, Foreground),
163    (DeclineCall, Foreground),
164    (DeleteChannel, Foreground),
165    (DeleteNotification, Foreground),
166    (DeleteProjectEntry, Foreground),
167    (EndStream, Foreground),
168    (Error, Foreground),
169    (ExpandProjectEntry, Foreground),
170    (ExpandProjectEntryResponse, Foreground),
171    (Follow, Foreground),
172    (FollowResponse, Foreground),
173    (FormatBuffers, Foreground),
174    (FormatBuffersResponse, Foreground),
175    (FuzzySearchUsers, Foreground),
176    (GetChannelMembers, Foreground),
177    (GetChannelMembersResponse, Foreground),
178    (GetChannelMessages, Background),
179    (GetChannelMessagesById, Background),
180    (GetChannelMessagesResponse, Background),
181    (GetCodeActions, Background),
182    (GetCodeActionsResponse, Background),
183    (GetCompletions, Background),
184    (GetCompletionsResponse, Background),
185    (GetDefinition, Background),
186    (GetDefinitionResponse, Background),
187    (GetDocumentHighlights, Background),
188    (GetDocumentHighlightsResponse, Background),
189    (GetHover, Background),
190    (GetHoverResponse, Background),
191    (GetNotifications, Foreground),
192    (GetNotificationsResponse, Foreground),
193    (GetPrivateUserInfo, Foreground),
194    (GetPrivateUserInfoResponse, Foreground),
195    (GetProjectSymbols, Background),
196    (GetProjectSymbolsResponse, Background),
197    (GetReferences, Background),
198    (GetReferencesResponse, Background),
199    (GetTypeDefinition, Background),
200    (GetTypeDefinitionResponse, Background),
201    (GetImplementation, Background),
202    (GetImplementationResponse, Background),
203    (GetUsers, Foreground),
204    (Hello, Foreground),
205    (IncomingCall, Foreground),
206    (InlayHints, Background),
207    (InlayHintsResponse, Background),
208    (InviteChannelMember, Foreground),
209    (JoinChannel, Foreground),
210    (JoinChannelBuffer, Foreground),
211    (JoinChannelBufferResponse, Foreground),
212    (JoinChannelChat, Foreground),
213    (JoinChannelChatResponse, Foreground),
214    (JoinProject, Foreground),
215    (JoinHostedProject, Foreground),
216    (JoinProjectResponse, Foreground),
217    (JoinRoom, Foreground),
218    (JoinRoomResponse, Foreground),
219    (LanguageModelResponse, Background),
220    (LeaveChannelBuffer, Background),
221    (LeaveChannelChat, Foreground),
222    (LeaveProject, Foreground),
223    (LeaveRoom, Foreground),
224    (MarkNotificationRead, Foreground),
225    (MoveChannel, Foreground),
226    (OnTypeFormatting, Background),
227    (OnTypeFormattingResponse, Background),
228    (OpenBufferById, Background),
229    (OpenBufferByPath, Background),
230    (OpenBufferForSymbol, Background),
231    (OpenBufferForSymbolResponse, Background),
232    (OpenBufferResponse, Background),
233    (PerformRename, Background),
234    (PerformRenameResponse, Background),
235    (Ping, Foreground),
236    (PrepareRename, Background),
237    (PrepareRenameResponse, Background),
238    (ProjectEntryResponse, Foreground),
239    (RefreshInlayHints, Foreground),
240    (RejoinChannelBuffers, Foreground),
241    (RejoinChannelBuffersResponse, Foreground),
242    (RejoinRoom, Foreground),
243    (RejoinRoomResponse, Foreground),
244    (ReloadBuffers, Foreground),
245    (ReloadBuffersResponse, Foreground),
246    (RemoveChannelMember, Foreground),
247    (RemoveChannelMessage, Foreground),
248    (UpdateChannelMessage, Foreground),
249    (RemoveContact, Foreground),
250    (RemoveProjectCollaborator, Foreground),
251    (RenameChannel, Foreground),
252    (RenameChannelResponse, Foreground),
253    (RenameProjectEntry, Foreground),
254    (RequestContact, Foreground),
255    (ResolveCompletionDocumentation, Background),
256    (ResolveCompletionDocumentationResponse, Background),
257    (ResolveInlayHint, Background),
258    (ResolveInlayHintResponse, Background),
259    (RespondToChannelInvite, Foreground),
260    (RespondToContactRequest, Foreground),
261    (RoomUpdated, Foreground),
262    (SaveBuffer, Foreground),
263    (SetChannelMemberRole, Foreground),
264    (SetChannelVisibility, Foreground),
265    (SearchProject, Background),
266    (SearchProjectResponse, Background),
267    (SendChannelMessage, Background),
268    (SendChannelMessageResponse, Background),
269    (ShareProject, Foreground),
270    (ShareProjectResponse, Foreground),
271    (ShowContacts, Foreground),
272    (StartLanguageServer, Foreground),
273    (SynchronizeBuffers, Foreground),
274    (SynchronizeBuffersResponse, Foreground),
275    (Test, Foreground),
276    (Unfollow, Foreground),
277    (UnshareProject, Foreground),
278    (UpdateBuffer, Foreground),
279    (UpdateBufferFile, Foreground),
280    (UpdateChannelBuffer, Foreground),
281    (UpdateChannelBufferCollaborators, Foreground),
282    (UpdateChannels, Foreground),
283    (UpdateUserChannels, Foreground),
284    (UpdateContacts, Foreground),
285    (UpdateDiagnosticSummary, Foreground),
286    (UpdateDiffBase, Foreground),
287    (UpdateFollowers, Foreground),
288    (UpdateInviteInfo, Foreground),
289    (UpdateLanguageServer, Foreground),
290    (UpdateParticipantLocation, Foreground),
291    (UpdateProject, Foreground),
292    (UpdateProjectCollaborator, Foreground),
293    (UpdateWorktree, Foreground),
294    (UpdateWorktreeSettings, Foreground),
295    (UsersResponse, Foreground),
296    (LspExtExpandMacro, Background),
297    (LspExtExpandMacroResponse, Background),
298    (SetRoomParticipantRole, Foreground),
299);
300
301request_messages!(
302    (ApplyCodeAction, ApplyCodeActionResponse),
303    (
304        ApplyCompletionAdditionalEdits,
305        ApplyCompletionAdditionalEditsResponse
306    ),
307    (Call, Ack),
308    (CancelCall, Ack),
309    (CopyProjectEntry, ProjectEntryResponse),
310    (CompleteWithLanguageModel, LanguageModelResponse),
311    (CountTokensWithLanguageModel, CountTokensResponse),
312    (CreateChannel, CreateChannelResponse),
313    (CreateProjectEntry, ProjectEntryResponse),
314    (CreateRoom, CreateRoomResponse),
315    (DeclineCall, Ack),
316    (DeleteChannel, Ack),
317    (DeleteProjectEntry, ProjectEntryResponse),
318    (ExpandProjectEntry, ExpandProjectEntryResponse),
319    (Follow, FollowResponse),
320    (FormatBuffers, FormatBuffersResponse),
321    (FuzzySearchUsers, UsersResponse),
322    (GetChannelMembers, GetChannelMembersResponse),
323    (GetChannelMessages, GetChannelMessagesResponse),
324    (GetChannelMessagesById, GetChannelMessagesResponse),
325    (GetCodeActions, GetCodeActionsResponse),
326    (GetCompletions, GetCompletionsResponse),
327    (GetDefinition, GetDefinitionResponse),
328    (GetImplementation, GetImplementationResponse),
329    (GetDocumentHighlights, GetDocumentHighlightsResponse),
330    (GetHover, GetHoverResponse),
331    (GetNotifications, GetNotificationsResponse),
332    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
333    (GetProjectSymbols, GetProjectSymbolsResponse),
334    (GetReferences, GetReferencesResponse),
335    (GetTypeDefinition, GetTypeDefinitionResponse),
336    (GetUsers, UsersResponse),
337    (IncomingCall, Ack),
338    (InlayHints, InlayHintsResponse),
339    (InviteChannelMember, Ack),
340    (JoinChannel, JoinRoomResponse),
341    (JoinChannelBuffer, JoinChannelBufferResponse),
342    (JoinChannelChat, JoinChannelChatResponse),
343    (JoinHostedProject, JoinProjectResponse),
344    (JoinProject, JoinProjectResponse),
345    (JoinRoom, JoinRoomResponse),
346    (LeaveChannelBuffer, Ack),
347    (LeaveRoom, Ack),
348    (MarkNotificationRead, Ack),
349    (MoveChannel, Ack),
350    (OnTypeFormatting, OnTypeFormattingResponse),
351    (OpenBufferById, OpenBufferResponse),
352    (OpenBufferByPath, OpenBufferResponse),
353    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
354    (PerformRename, PerformRenameResponse),
355    (Ping, Ack),
356    (PrepareRename, PrepareRenameResponse),
357    (RefreshInlayHints, Ack),
358    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
359    (RejoinRoom, RejoinRoomResponse),
360    (ReloadBuffers, ReloadBuffersResponse),
361    (RemoveChannelMember, Ack),
362    (RemoveChannelMessage, Ack),
363    (UpdateChannelMessage, Ack),
364    (RemoveContact, Ack),
365    (RenameChannel, RenameChannelResponse),
366    (RenameProjectEntry, ProjectEntryResponse),
367    (RequestContact, Ack),
368    (
369        ResolveCompletionDocumentation,
370        ResolveCompletionDocumentationResponse
371    ),
372    (ResolveInlayHint, ResolveInlayHintResponse),
373    (RespondToChannelInvite, Ack),
374    (RespondToContactRequest, Ack),
375    (SaveBuffer, BufferSaved),
376    (SearchProject, SearchProjectResponse),
377    (SendChannelMessage, SendChannelMessageResponse),
378    (SetChannelMemberRole, Ack),
379    (SetChannelVisibility, Ack),
380    (ShareProject, ShareProjectResponse),
381    (SynchronizeBuffers, SynchronizeBuffersResponse),
382    (Test, Test),
383    (UpdateBuffer, Ack),
384    (UpdateParticipantLocation, Ack),
385    (UpdateProject, Ack),
386    (UpdateWorktree, Ack),
387    (LspExtExpandMacro, LspExtExpandMacroResponse),
388    (SetRoomParticipantRole, Ack),
389);
390
391entity_messages!(
392    {project_id, ShareProject},
393    AddProjectCollaborator,
394    ApplyCodeAction,
395    ApplyCompletionAdditionalEdits,
396    BufferReloaded,
397    BufferSaved,
398    CopyProjectEntry,
399    CreateBufferForPeer,
400    CreateProjectEntry,
401    DeleteProjectEntry,
402    ExpandProjectEntry,
403    FormatBuffers,
404    GetCodeActions,
405    GetCompletions,
406    GetDefinition,
407    GetImplementation,
408    GetDocumentHighlights,
409    GetHover,
410    GetProjectSymbols,
411    GetReferences,
412    GetTypeDefinition,
413    InlayHints,
414    JoinProject,
415    LeaveProject,
416    OnTypeFormatting,
417    OpenBufferById,
418    OpenBufferByPath,
419    OpenBufferForSymbol,
420    PerformRename,
421    PrepareRename,
422    RefreshInlayHints,
423    ReloadBuffers,
424    RemoveProjectCollaborator,
425    RenameProjectEntry,
426    ResolveCompletionDocumentation,
427    ResolveInlayHint,
428    SaveBuffer,
429    SearchProject,
430    StartLanguageServer,
431    SynchronizeBuffers,
432    UnshareProject,
433    UpdateBuffer,
434    UpdateBufferFile,
435    UpdateDiagnosticSummary,
436    UpdateDiffBase,
437    UpdateLanguageServer,
438    UpdateProject,
439    UpdateProjectCollaborator,
440    UpdateWorktree,
441    UpdateWorktreeSettings,
442    LspExtExpandMacro,
443);
444
445entity_messages!(
446    {channel_id, Channel},
447    ChannelMessageSent,
448    ChannelMessageUpdate,
449    RemoveChannelMessage,
450    UpdateChannelMessage,
451    UpdateChannelBuffer,
452    UpdateChannelBufferCollaborators,
453);
454
455const KIB: usize = 1024;
456const MIB: usize = KIB * 1024;
457const MAX_BUFFER_LEN: usize = MIB;
458
459/// A stream of protobuf messages.
460pub struct MessageStream<S> {
461    stream: S,
462    encoding_buffer: Vec<u8>,
463}
464
465#[allow(clippy::large_enum_variant)]
466#[derive(Debug)]
467pub enum Message {
468    Envelope(Envelope),
469    Ping,
470    Pong,
471}
472
473impl<S> MessageStream<S> {
474    pub fn new(stream: S) -> Self {
475        Self {
476            stream,
477            encoding_buffer: Vec::new(),
478        }
479    }
480
481    pub fn inner_mut(&mut self) -> &mut S {
482        &mut self.stream
483    }
484}
485
486impl<S> MessageStream<S>
487where
488    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
489{
490    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
491        #[cfg(any(test, feature = "test-support"))]
492        const COMPRESSION_LEVEL: i32 = -7;
493
494        #[cfg(not(any(test, feature = "test-support")))]
495        const COMPRESSION_LEVEL: i32 = 4;
496
497        match message {
498            Message::Envelope(message) => {
499                self.encoding_buffer.reserve(message.encoded_len());
500                message
501                    .encode(&mut self.encoding_buffer)
502                    .map_err(io::Error::from)?;
503                let buffer =
504                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
505                        .unwrap();
506
507                self.encoding_buffer.clear();
508                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
509                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
510            }
511            Message::Ping => {
512                self.stream
513                    .send(WebSocketMessage::Ping(Default::default()))
514                    .await?;
515            }
516            Message::Pong => {
517                self.stream
518                    .send(WebSocketMessage::Pong(Default::default()))
519                    .await?;
520            }
521        }
522
523        Ok(())
524    }
525}
526
527impl<S> MessageStream<S>
528where
529    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
530{
531    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
532        while let Some(bytes) = self.stream.next().await {
533            let received_at = Instant::now();
534            match bytes? {
535                WebSocketMessage::Binary(bytes) => {
536                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
537                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
538                        .map_err(io::Error::from)?;
539
540                    self.encoding_buffer.clear();
541                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
542                    return Ok((Message::Envelope(envelope), received_at));
543                }
544                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
545                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
546                WebSocketMessage::Close(_) => break,
547                _ => {}
548            }
549        }
550        Err(anyhow!("connection closed"))
551    }
552}
553
554impl From<Timestamp> for SystemTime {
555    fn from(val: Timestamp) -> Self {
556        UNIX_EPOCH
557            .checked_add(Duration::new(val.seconds, val.nanos))
558            .unwrap()
559    }
560}
561
562impl From<SystemTime> for Timestamp {
563    fn from(time: SystemTime) -> Self {
564        let duration = time.duration_since(UNIX_EPOCH).unwrap();
565        Self {
566            seconds: duration.as_secs(),
567            nanos: duration.subsec_nanos(),
568        }
569    }
570}
571
572impl From<u128> for Nonce {
573    fn from(nonce: u128) -> Self {
574        let upper_half = (nonce >> 64) as u64;
575        let lower_half = nonce as u64;
576        Self {
577            upper_half,
578            lower_half,
579        }
580    }
581}
582
583impl From<Nonce> for u128 {
584    fn from(nonce: Nonce) -> Self {
585        let upper_half = (nonce.upper_half as u128) << 64;
586        let lower_half = nonce.lower_half as u128;
587        upper_half | lower_half
588    }
589}
590
591pub fn split_worktree_update(
592    mut message: UpdateWorktree,
593    max_chunk_size: usize,
594) -> impl Iterator<Item = UpdateWorktree> {
595    let mut done_files = false;
596
597    let mut repository_map = message
598        .updated_repositories
599        .into_iter()
600        .map(|repo| (repo.work_directory_id, repo))
601        .collect::<HashMap<_, _>>();
602
603    iter::from_fn(move || {
604        if done_files {
605            return None;
606        }
607
608        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
609        let updated_entries: Vec<_> = message
610            .updated_entries
611            .drain(..updated_entries_chunk_size)
612            .collect();
613
614        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
615        let removed_entries = message
616            .removed_entries
617            .drain(..removed_entries_chunk_size)
618            .collect();
619
620        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
621
622        let mut updated_repositories = Vec::new();
623
624        if !repository_map.is_empty() {
625            for entry in &updated_entries {
626                if let Some(repo) = repository_map.remove(&entry.id) {
627                    updated_repositories.push(repo)
628                }
629            }
630        }
631
632        let removed_repositories = if done_files {
633            mem::take(&mut message.removed_repositories)
634        } else {
635            Default::default()
636        };
637
638        if done_files {
639            updated_repositories.extend(mem::take(&mut repository_map).into_values());
640        }
641
642        Some(UpdateWorktree {
643            project_id: message.project_id,
644            worktree_id: message.worktree_id,
645            root_name: message.root_name.clone(),
646            abs_path: message.abs_path.clone(),
647            updated_entries,
648            removed_entries,
649            scan_id: message.scan_id,
650            is_last_update: done_files && message.is_last_update,
651            updated_repositories,
652            removed_repositories,
653        })
654    })
655}
656
657#[cfg(test)]
658mod tests {
659    use super::*;
660
661    #[gpui::test]
662    async fn test_buffer_size() {
663        let (tx, rx) = futures::channel::mpsc::unbounded();
664        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
665        sink.write(Message::Envelope(Envelope {
666            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
667                root_name: "abcdefg".repeat(10),
668                ..Default::default()
669            })),
670            ..Default::default()
671        }))
672        .await
673        .unwrap();
674        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
675        sink.write(Message::Envelope(Envelope {
676            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
677                root_name: "abcdefg".repeat(1000000),
678                ..Default::default()
679            })),
680            ..Default::default()
681        }))
682        .await
683        .unwrap();
684        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
685
686        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
687        stream.read().await.unwrap();
688        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
689        stream.read().await.unwrap();
690        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
691    }
692
693    #[gpui::test]
694    fn test_converting_peer_id_from_and_to_u64() {
695        let peer_id = PeerId {
696            owner_id: 10,
697            id: 3,
698        };
699        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
700        let peer_id = PeerId {
701            owner_id: u32::MAX,
702            id: 3,
703        };
704        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
705        let peer_id = PeerId {
706            owner_id: 10,
707            id: u32::MAX,
708        };
709        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
710        let peer_id = PeerId {
711            owner_id: u32::MAX,
712            id: u32::MAX,
713        };
714        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
715    }
716}