proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (CompleteWithLanguageModel, Background),
153    (CopyProjectEntry, Foreground),
154    (CountTokensWithLanguageModel, Background),
155    (CountTokensResponse, Background),
156    (CreateBufferForPeer, Foreground),
157    (CreateChannel, Foreground),
158    (CreateChannelResponse, Foreground),
159    (CreateProjectEntry, Foreground),
160    (CreateRoom, Foreground),
161    (CreateRoomResponse, Foreground),
162    (DeclineCall, Foreground),
163    (DeleteChannel, Foreground),
164    (DeleteNotification, Foreground),
165    (DeleteProjectEntry, Foreground),
166    (EndStream, Foreground),
167    (Error, Foreground),
168    (ExpandProjectEntry, Foreground),
169    (ExpandProjectEntryResponse, Foreground),
170    (Follow, Foreground),
171    (FollowResponse, Foreground),
172    (FormatBuffers, Foreground),
173    (FormatBuffersResponse, Foreground),
174    (FuzzySearchUsers, Foreground),
175    (GetChannelMembers, Foreground),
176    (GetChannelMembersResponse, Foreground),
177    (GetChannelMessages, Background),
178    (GetChannelMessagesById, Background),
179    (GetChannelMessagesResponse, Background),
180    (GetCodeActions, Background),
181    (GetCodeActionsResponse, Background),
182    (GetCompletions, Background),
183    (GetCompletionsResponse, Background),
184    (GetDefinition, Background),
185    (GetDefinitionResponse, Background),
186    (GetDocumentHighlights, Background),
187    (GetDocumentHighlightsResponse, Background),
188    (GetHover, Background),
189    (GetHoverResponse, Background),
190    (GetNotifications, Foreground),
191    (GetNotificationsResponse, Foreground),
192    (GetPrivateUserInfo, Foreground),
193    (GetPrivateUserInfoResponse, Foreground),
194    (GetProjectSymbols, Background),
195    (GetProjectSymbolsResponse, Background),
196    (GetReferences, Background),
197    (GetReferencesResponse, Background),
198    (GetTypeDefinition, Background),
199    (GetTypeDefinitionResponse, Background),
200    (GetImplementation, Background),
201    (GetImplementationResponse, Background),
202    (GetUsers, Foreground),
203    (Hello, Foreground),
204    (IncomingCall, Foreground),
205    (InlayHints, Background),
206    (InlayHintsResponse, Background),
207    (InviteChannelMember, Foreground),
208    (JoinChannel, Foreground),
209    (JoinChannelBuffer, Foreground),
210    (JoinChannelBufferResponse, Foreground),
211    (JoinChannelChat, Foreground),
212    (JoinChannelChatResponse, Foreground),
213    (JoinProject, Foreground),
214    (JoinHostedProject, Foreground),
215    (JoinProjectResponse, Foreground),
216    (JoinRoom, Foreground),
217    (JoinRoomResponse, Foreground),
218    (LanguageModelResponse, Background),
219    (LeaveChannelBuffer, Background),
220    (LeaveChannelChat, Foreground),
221    (LeaveProject, Foreground),
222    (LeaveRoom, Foreground),
223    (MarkNotificationRead, Foreground),
224    (MoveChannel, Foreground),
225    (OnTypeFormatting, Background),
226    (OnTypeFormattingResponse, Background),
227    (OpenBufferById, Background),
228    (OpenBufferByPath, Background),
229    (OpenBufferForSymbol, Background),
230    (OpenBufferForSymbolResponse, Background),
231    (OpenBufferResponse, Background),
232    (PerformRename, Background),
233    (PerformRenameResponse, Background),
234    (Ping, Foreground),
235    (PrepareRename, Background),
236    (PrepareRenameResponse, Background),
237    (ProjectEntryResponse, Foreground),
238    (RefreshInlayHints, Foreground),
239    (RejoinChannelBuffers, Foreground),
240    (RejoinChannelBuffersResponse, Foreground),
241    (RejoinRoom, Foreground),
242    (RejoinRoomResponse, Foreground),
243    (ReloadBuffers, Foreground),
244    (ReloadBuffersResponse, Foreground),
245    (RemoveChannelMember, Foreground),
246    (RemoveChannelMessage, Foreground),
247    (RemoveContact, Foreground),
248    (RemoveProjectCollaborator, Foreground),
249    (RenameChannel, Foreground),
250    (RenameChannelResponse, Foreground),
251    (RenameProjectEntry, Foreground),
252    (RequestContact, Foreground),
253    (ResolveCompletionDocumentation, Background),
254    (ResolveCompletionDocumentationResponse, Background),
255    (ResolveInlayHint, Background),
256    (ResolveInlayHintResponse, Background),
257    (RespondToChannelInvite, Foreground),
258    (RespondToContactRequest, Foreground),
259    (RoomUpdated, Foreground),
260    (SaveBuffer, Foreground),
261    (SetChannelMemberRole, Foreground),
262    (SetChannelVisibility, Foreground),
263    (SearchProject, Background),
264    (SearchProjectResponse, Background),
265    (SendChannelMessage, Background),
266    (SendChannelMessageResponse, Background),
267    (ShareProject, Foreground),
268    (ShareProjectResponse, Foreground),
269    (ShowContacts, Foreground),
270    (StartLanguageServer, Foreground),
271    (SynchronizeBuffers, Foreground),
272    (SynchronizeBuffersResponse, Foreground),
273    (Test, Foreground),
274    (Unfollow, Foreground),
275    (UnshareProject, Foreground),
276    (UpdateBuffer, Foreground),
277    (UpdateBufferFile, Foreground),
278    (UpdateChannelBuffer, Foreground),
279    (UpdateChannelBufferCollaborators, Foreground),
280    (UpdateChannels, Foreground),
281    (UpdateUserChannels, Foreground),
282    (UpdateContacts, Foreground),
283    (UpdateDiagnosticSummary, Foreground),
284    (UpdateDiffBase, Foreground),
285    (UpdateFollowers, Foreground),
286    (UpdateInviteInfo, Foreground),
287    (UpdateLanguageServer, Foreground),
288    (UpdateParticipantLocation, Foreground),
289    (UpdateProject, Foreground),
290    (UpdateProjectCollaborator, Foreground),
291    (UpdateWorktree, Foreground),
292    (UpdateWorktreeSettings, Foreground),
293    (UsersResponse, Foreground),
294    (LspExtExpandMacro, Background),
295    (LspExtExpandMacroResponse, Background),
296    (SetRoomParticipantRole, Foreground),
297);
298
299request_messages!(
300    (ApplyCodeAction, ApplyCodeActionResponse),
301    (
302        ApplyCompletionAdditionalEdits,
303        ApplyCompletionAdditionalEditsResponse
304    ),
305    (Call, Ack),
306    (CancelCall, Ack),
307    (CopyProjectEntry, ProjectEntryResponse),
308    (CompleteWithLanguageModel, LanguageModelResponse),
309    (CountTokensWithLanguageModel, CountTokensResponse),
310    (CreateChannel, CreateChannelResponse),
311    (CreateProjectEntry, ProjectEntryResponse),
312    (CreateRoom, CreateRoomResponse),
313    (DeclineCall, Ack),
314    (DeleteChannel, Ack),
315    (DeleteProjectEntry, ProjectEntryResponse),
316    (ExpandProjectEntry, ExpandProjectEntryResponse),
317    (Follow, FollowResponse),
318    (FormatBuffers, FormatBuffersResponse),
319    (FuzzySearchUsers, UsersResponse),
320    (GetChannelMembers, GetChannelMembersResponse),
321    (GetChannelMessages, GetChannelMessagesResponse),
322    (GetChannelMessagesById, GetChannelMessagesResponse),
323    (GetCodeActions, GetCodeActionsResponse),
324    (GetCompletions, GetCompletionsResponse),
325    (GetDefinition, GetDefinitionResponse),
326    (GetImplementation, GetImplementationResponse),
327    (GetDocumentHighlights, GetDocumentHighlightsResponse),
328    (GetHover, GetHoverResponse),
329    (GetNotifications, GetNotificationsResponse),
330    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
331    (GetProjectSymbols, GetProjectSymbolsResponse),
332    (GetReferences, GetReferencesResponse),
333    (GetTypeDefinition, GetTypeDefinitionResponse),
334    (GetUsers, UsersResponse),
335    (IncomingCall, Ack),
336    (InlayHints, InlayHintsResponse),
337    (InviteChannelMember, Ack),
338    (JoinChannel, JoinRoomResponse),
339    (JoinChannelBuffer, JoinChannelBufferResponse),
340    (JoinChannelChat, JoinChannelChatResponse),
341    (JoinHostedProject, JoinProjectResponse),
342    (JoinProject, JoinProjectResponse),
343    (JoinRoom, JoinRoomResponse),
344    (LeaveChannelBuffer, Ack),
345    (LeaveRoom, Ack),
346    (MarkNotificationRead, Ack),
347    (MoveChannel, Ack),
348    (OnTypeFormatting, OnTypeFormattingResponse),
349    (OpenBufferById, OpenBufferResponse),
350    (OpenBufferByPath, OpenBufferResponse),
351    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
352    (PerformRename, PerformRenameResponse),
353    (Ping, Ack),
354    (PrepareRename, PrepareRenameResponse),
355    (RefreshInlayHints, Ack),
356    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
357    (RejoinRoom, RejoinRoomResponse),
358    (ReloadBuffers, ReloadBuffersResponse),
359    (RemoveChannelMember, Ack),
360    (RemoveChannelMessage, Ack),
361    (RemoveContact, Ack),
362    (RenameChannel, RenameChannelResponse),
363    (RenameProjectEntry, ProjectEntryResponse),
364    (RequestContact, Ack),
365    (
366        ResolveCompletionDocumentation,
367        ResolveCompletionDocumentationResponse
368    ),
369    (ResolveInlayHint, ResolveInlayHintResponse),
370    (RespondToChannelInvite, Ack),
371    (RespondToContactRequest, Ack),
372    (SaveBuffer, BufferSaved),
373    (SearchProject, SearchProjectResponse),
374    (SendChannelMessage, SendChannelMessageResponse),
375    (SetChannelMemberRole, Ack),
376    (SetChannelVisibility, Ack),
377    (ShareProject, ShareProjectResponse),
378    (SynchronizeBuffers, SynchronizeBuffersResponse),
379    (Test, Test),
380    (UpdateBuffer, Ack),
381    (UpdateParticipantLocation, Ack),
382    (UpdateProject, Ack),
383    (UpdateWorktree, Ack),
384    (LspExtExpandMacro, LspExtExpandMacroResponse),
385    (SetRoomParticipantRole, Ack),
386);
387
388entity_messages!(
389    {project_id, ShareProject},
390    AddProjectCollaborator,
391    ApplyCodeAction,
392    ApplyCompletionAdditionalEdits,
393    BufferReloaded,
394    BufferSaved,
395    CopyProjectEntry,
396    CreateBufferForPeer,
397    CreateProjectEntry,
398    DeleteProjectEntry,
399    ExpandProjectEntry,
400    FormatBuffers,
401    GetCodeActions,
402    GetCompletions,
403    GetDefinition,
404    GetImplementation,
405    GetDocumentHighlights,
406    GetHover,
407    GetProjectSymbols,
408    GetReferences,
409    GetTypeDefinition,
410    InlayHints,
411    JoinProject,
412    LeaveProject,
413    OnTypeFormatting,
414    OpenBufferById,
415    OpenBufferByPath,
416    OpenBufferForSymbol,
417    PerformRename,
418    PrepareRename,
419    RefreshInlayHints,
420    ReloadBuffers,
421    RemoveProjectCollaborator,
422    RenameProjectEntry,
423    ResolveCompletionDocumentation,
424    ResolveInlayHint,
425    SaveBuffer,
426    SearchProject,
427    StartLanguageServer,
428    SynchronizeBuffers,
429    UnshareProject,
430    UpdateBuffer,
431    UpdateBufferFile,
432    UpdateDiagnosticSummary,
433    UpdateDiffBase,
434    UpdateLanguageServer,
435    UpdateProject,
436    UpdateProjectCollaborator,
437    UpdateWorktree,
438    UpdateWorktreeSettings,
439    LspExtExpandMacro,
440);
441
442entity_messages!(
443    {channel_id, Channel},
444    ChannelMessageSent,
445    RemoveChannelMessage,
446    UpdateChannelBuffer,
447    UpdateChannelBufferCollaborators,
448);
449
450const KIB: usize = 1024;
451const MIB: usize = KIB * 1024;
452const MAX_BUFFER_LEN: usize = MIB;
453
454/// A stream of protobuf messages.
455pub struct MessageStream<S> {
456    stream: S,
457    encoding_buffer: Vec<u8>,
458}
459
460#[allow(clippy::large_enum_variant)]
461#[derive(Debug)]
462pub enum Message {
463    Envelope(Envelope),
464    Ping,
465    Pong,
466}
467
468impl<S> MessageStream<S> {
469    pub fn new(stream: S) -> Self {
470        Self {
471            stream,
472            encoding_buffer: Vec::new(),
473        }
474    }
475
476    pub fn inner_mut(&mut self) -> &mut S {
477        &mut self.stream
478    }
479}
480
481impl<S> MessageStream<S>
482where
483    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
484{
485    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
486        #[cfg(any(test, feature = "test-support"))]
487        const COMPRESSION_LEVEL: i32 = -7;
488
489        #[cfg(not(any(test, feature = "test-support")))]
490        const COMPRESSION_LEVEL: i32 = 4;
491
492        match message {
493            Message::Envelope(message) => {
494                self.encoding_buffer.reserve(message.encoded_len());
495                message
496                    .encode(&mut self.encoding_buffer)
497                    .map_err(io::Error::from)?;
498                let buffer =
499                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
500                        .unwrap();
501
502                self.encoding_buffer.clear();
503                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
504                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
505            }
506            Message::Ping => {
507                self.stream
508                    .send(WebSocketMessage::Ping(Default::default()))
509                    .await?;
510            }
511            Message::Pong => {
512                self.stream
513                    .send(WebSocketMessage::Pong(Default::default()))
514                    .await?;
515            }
516        }
517
518        Ok(())
519    }
520}
521
522impl<S> MessageStream<S>
523where
524    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
525{
526    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
527        while let Some(bytes) = self.stream.next().await {
528            let received_at = Instant::now();
529            match bytes? {
530                WebSocketMessage::Binary(bytes) => {
531                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
532                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
533                        .map_err(io::Error::from)?;
534
535                    self.encoding_buffer.clear();
536                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
537                    return Ok((Message::Envelope(envelope), received_at));
538                }
539                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
540                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
541                WebSocketMessage::Close(_) => break,
542                _ => {}
543            }
544        }
545        Err(anyhow!("connection closed"))
546    }
547}
548
549impl From<Timestamp> for SystemTime {
550    fn from(val: Timestamp) -> Self {
551        UNIX_EPOCH
552            .checked_add(Duration::new(val.seconds, val.nanos))
553            .unwrap()
554    }
555}
556
557impl From<SystemTime> for Timestamp {
558    fn from(time: SystemTime) -> Self {
559        let duration = time.duration_since(UNIX_EPOCH).unwrap();
560        Self {
561            seconds: duration.as_secs(),
562            nanos: duration.subsec_nanos(),
563        }
564    }
565}
566
567impl From<u128> for Nonce {
568    fn from(nonce: u128) -> Self {
569        let upper_half = (nonce >> 64) as u64;
570        let lower_half = nonce as u64;
571        Self {
572            upper_half,
573            lower_half,
574        }
575    }
576}
577
578impl From<Nonce> for u128 {
579    fn from(nonce: Nonce) -> Self {
580        let upper_half = (nonce.upper_half as u128) << 64;
581        let lower_half = nonce.lower_half as u128;
582        upper_half | lower_half
583    }
584}
585
586pub fn split_worktree_update(
587    mut message: UpdateWorktree,
588    max_chunk_size: usize,
589) -> impl Iterator<Item = UpdateWorktree> {
590    let mut done_files = false;
591
592    let mut repository_map = message
593        .updated_repositories
594        .into_iter()
595        .map(|repo| (repo.work_directory_id, repo))
596        .collect::<HashMap<_, _>>();
597
598    iter::from_fn(move || {
599        if done_files {
600            return None;
601        }
602
603        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
604        let updated_entries: Vec<_> = message
605            .updated_entries
606            .drain(..updated_entries_chunk_size)
607            .collect();
608
609        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
610        let removed_entries = message
611            .removed_entries
612            .drain(..removed_entries_chunk_size)
613            .collect();
614
615        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
616
617        let mut updated_repositories = Vec::new();
618
619        if !repository_map.is_empty() {
620            for entry in &updated_entries {
621                if let Some(repo) = repository_map.remove(&entry.id) {
622                    updated_repositories.push(repo)
623                }
624            }
625        }
626
627        let removed_repositories = if done_files {
628            mem::take(&mut message.removed_repositories)
629        } else {
630            Default::default()
631        };
632
633        if done_files {
634            updated_repositories.extend(mem::take(&mut repository_map).into_values());
635        }
636
637        Some(UpdateWorktree {
638            project_id: message.project_id,
639            worktree_id: message.worktree_id,
640            root_name: message.root_name.clone(),
641            abs_path: message.abs_path.clone(),
642            updated_entries,
643            removed_entries,
644            scan_id: message.scan_id,
645            is_last_update: done_files && message.is_last_update,
646            updated_repositories,
647            removed_repositories,
648        })
649    })
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655
656    #[gpui::test]
657    async fn test_buffer_size() {
658        let (tx, rx) = futures::channel::mpsc::unbounded();
659        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
660        sink.write(Message::Envelope(Envelope {
661            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
662                root_name: "abcdefg".repeat(10),
663                ..Default::default()
664            })),
665            ..Default::default()
666        }))
667        .await
668        .unwrap();
669        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
670        sink.write(Message::Envelope(Envelope {
671            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
672                root_name: "abcdefg".repeat(1000000),
673                ..Default::default()
674            })),
675            ..Default::default()
676        }))
677        .await
678        .unwrap();
679        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
680
681        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
682        stream.read().await.unwrap();
683        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
684        stream.read().await.unwrap();
685        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
686    }
687
688    #[gpui::test]
689    fn test_converting_peer_id_from_and_to_u64() {
690        let peer_id = PeerId {
691            owner_id: 10,
692            id: 3,
693        };
694        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
695        let peer_id = PeerId {
696            owner_id: u32::MAX,
697            id: 3,
698        };
699        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
700        let peer_id = PeerId {
701            owner_id: 10,
702            id: u32::MAX,
703        };
704        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
705        let peer_id = PeerId {
706            owner_id: u32::MAX,
707            id: u32::MAX,
708        };
709        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
710    }
711}