proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::time::Instant;
 12use std::{
 13    cmp,
 14    fmt::Debug,
 15    io, iter,
 16    time::{Duration, SystemTime, UNIX_EPOCH},
 17};
 18use std::{fmt, mem};
 19
 20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 21
 22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 23    const NAME: &'static str;
 24    const PRIORITY: MessagePriority;
 25    fn into_envelope(
 26        self,
 27        id: u32,
 28        responding_to: Option<u32>,
 29        original_sender_id: Option<PeerId>,
 30    ) -> Envelope;
 31    fn from_envelope(envelope: Envelope) -> Option<Self>;
 32}
 33
 34pub trait EntityMessage: EnvelopedMessage {
 35    type Entity;
 36    fn remote_entity_id(&self) -> u64;
 37}
 38
 39pub trait RequestMessage: EnvelopedMessage {
 40    type Response: EnvelopedMessage;
 41}
 42
 43pub trait AnyTypedEnvelope: 'static + Send + Sync {
 44    fn payload_type_id(&self) -> TypeId;
 45    fn payload_type_name(&self) -> &'static str;
 46    fn as_any(&self) -> &dyn Any;
 47    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 48    fn is_background(&self) -> bool;
 49    fn original_sender_id(&self) -> Option<PeerId>;
 50    fn sender_id(&self) -> ConnectionId;
 51    fn message_id(&self) -> u32;
 52}
 53
 54pub enum MessagePriority {
 55    Foreground,
 56    Background,
 57}
 58
 59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 60    fn payload_type_id(&self) -> TypeId {
 61        TypeId::of::<T>()
 62    }
 63
 64    fn payload_type_name(&self) -> &'static str {
 65        T::NAME
 66    }
 67
 68    fn as_any(&self) -> &dyn Any {
 69        self
 70    }
 71
 72    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 73        self
 74    }
 75
 76    fn is_background(&self) -> bool {
 77        matches!(T::PRIORITY, MessagePriority::Background)
 78    }
 79
 80    fn original_sender_id(&self) -> Option<PeerId> {
 81        self.original_sender_id
 82    }
 83
 84    fn sender_id(&self) -> ConnectionId {
 85        self.sender_id
 86    }
 87
 88    fn message_id(&self) -> u32 {
 89        self.message_id
 90    }
 91}
 92
 93impl PeerId {
 94    pub fn from_u64(peer_id: u64) -> Self {
 95        let owner_id = (peer_id >> 32) as u32;
 96        let id = peer_id as u32;
 97        Self { owner_id, id }
 98    }
 99
100    pub fn as_u64(self) -> u64 {
101        ((self.owner_id as u64) << 32) | (self.id as u64)
102    }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110    fn cmp(&self, other: &Self) -> cmp::Ordering {
111        self.owner_id
112            .cmp(&other.owner_id)
113            .then_with(|| self.id.cmp(&other.id))
114    }
115}
116
117impl PartialOrd for PeerId {
118    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119        Some(self.cmp(other))
120    }
121}
122
123impl std::hash::Hash for PeerId {
124    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125        self.owner_id.hash(state);
126        self.id.hash(state);
127    }
128}
129
130impl fmt::Display for PeerId {
131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132        write!(f, "{}/{}", self.owner_id, self.id)
133    }
134}
135
136messages!(
137    (Ack, Foreground),
138    (AckBufferOperation, Background),
139    (AckChannelMessage, Background),
140    (AddNotification, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Background),
143    (ApplyCodeActionResponse, Background),
144    (ApplyCompletionAdditionalEdits, Background),
145    (ApplyCompletionAdditionalEditsResponse, Background),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (Call, Foreground),
149    (CallCanceled, Foreground),
150    (CancelCall, Foreground),
151    (ChannelMessageSent, Foreground),
152    (ChannelMessageUpdate, Foreground),
153    (CompleteWithLanguageModel, Background),
154    (CopyProjectEntry, Foreground),
155    (CountTokensWithLanguageModel, Background),
156    (CountTokensResponse, Background),
157    (CreateBufferForPeer, Foreground),
158    (CreateChannel, Foreground),
159    (CreateChannelResponse, Foreground),
160    (CreateProjectEntry, Foreground),
161    (CreateRoom, Foreground),
162    (CreateRoomResponse, Foreground),
163    (DeclineCall, Foreground),
164    (DeleteChannel, Foreground),
165    (DeleteNotification, Foreground),
166    (DeleteProjectEntry, Foreground),
167    (EndStream, Foreground),
168    (Error, Foreground),
169    (ExpandProjectEntry, Foreground),
170    (ExpandProjectEntryResponse, Foreground),
171    (Follow, Foreground),
172    (FollowResponse, Foreground),
173    (FormatBuffers, Foreground),
174    (FormatBuffersResponse, Foreground),
175    (FuzzySearchUsers, Foreground),
176    (GetChannelMembers, Foreground),
177    (GetChannelMembersResponse, Foreground),
178    (GetChannelMessages, Background),
179    (GetChannelMessagesById, Background),
180    (GetChannelMessagesResponse, Background),
181    (GetCodeActions, Background),
182    (GetCodeActionsResponse, Background),
183    (GetCompletions, Background),
184    (GetCompletionsResponse, Background),
185    (GetDefinition, Background),
186    (GetDefinitionResponse, Background),
187    (GetDocumentHighlights, Background),
188    (GetDocumentHighlightsResponse, Background),
189    (GetHover, Background),
190    (GetHoverResponse, Background),
191    (GetNotifications, Foreground),
192    (GetNotificationsResponse, Foreground),
193    (GetPrivateUserInfo, Foreground),
194    (GetPrivateUserInfoResponse, Foreground),
195    (GetProjectSymbols, Background),
196    (GetProjectSymbolsResponse, Background),
197    (GetReferences, Background),
198    (GetReferencesResponse, Background),
199    (GetTypeDefinition, Background),
200    (GetTypeDefinitionResponse, Background),
201    (GetImplementation, Background),
202    (GetImplementationResponse, Background),
203    (GetUsers, Foreground),
204    (Hello, Foreground),
205    (IncomingCall, Foreground),
206    (InlayHints, Background),
207    (InlayHintsResponse, Background),
208    (InviteChannelMember, Foreground),
209    (JoinChannel, Foreground),
210    (JoinChannelBuffer, Foreground),
211    (JoinChannelBufferResponse, Foreground),
212    (JoinChannelChat, Foreground),
213    (JoinChannelChatResponse, Foreground),
214    (JoinProject, Foreground),
215    (JoinHostedProject, Foreground),
216    (JoinProjectResponse, Foreground),
217    (JoinRoom, Foreground),
218    (JoinRoomResponse, Foreground),
219    (LanguageModelResponse, Background),
220    (LeaveChannelBuffer, Background),
221    (LeaveChannelChat, Foreground),
222    (LeaveProject, Foreground),
223    (LeaveRoom, Foreground),
224    (MarkNotificationRead, Foreground),
225    (MoveChannel, Foreground),
226    (OnTypeFormatting, Background),
227    (OnTypeFormattingResponse, Background),
228    (OpenBufferById, Background),
229    (OpenBufferByPath, Background),
230    (OpenBufferForSymbol, Background),
231    (OpenBufferForSymbolResponse, Background),
232    (OpenBufferResponse, Background),
233    (PerformRename, Background),
234    (PerformRenameResponse, Background),
235    (Ping, Foreground),
236    (PrepareRename, Background),
237    (PrepareRenameResponse, Background),
238    (ProjectEntryResponse, Foreground),
239    (RefreshInlayHints, Foreground),
240    (RejoinChannelBuffers, Foreground),
241    (RejoinChannelBuffersResponse, Foreground),
242    (RejoinRoom, Foreground),
243    (RejoinRoomResponse, Foreground),
244    (ReloadBuffers, Foreground),
245    (ReloadBuffersResponse, Foreground),
246    (RemoveChannelMember, Foreground),
247    (RemoveChannelMessage, Foreground),
248    (UpdateChannelMessage, Foreground),
249    (RemoveContact, Foreground),
250    (RemoveProjectCollaborator, Foreground),
251    (RenameChannel, Foreground),
252    (RenameChannelResponse, Foreground),
253    (RenameProjectEntry, Foreground),
254    (RequestContact, Foreground),
255    (ResolveCompletionDocumentation, Background),
256    (ResolveCompletionDocumentationResponse, Background),
257    (ResolveInlayHint, Background),
258    (ResolveInlayHintResponse, Background),
259    (RespondToChannelInvite, Foreground),
260    (RespondToContactRequest, Foreground),
261    (RoomUpdated, Foreground),
262    (SaveBuffer, Foreground),
263    (SetChannelMemberRole, Foreground),
264    (SetChannelVisibility, Foreground),
265    (SearchProject, Background),
266    (SearchProjectResponse, Background),
267    (SendChannelMessage, Background),
268    (SendChannelMessageResponse, Background),
269    (ShareProject, Foreground),
270    (ShareProjectResponse, Foreground),
271    (ShowContacts, Foreground),
272    (StartLanguageServer, Foreground),
273    (SynchronizeBuffers, Foreground),
274    (SynchronizeBuffersResponse, Foreground),
275    (Test, Foreground),
276    (Unfollow, Foreground),
277    (UnshareProject, Foreground),
278    (UpdateBuffer, Foreground),
279    (UpdateBufferFile, Foreground),
280    (UpdateChannelBuffer, Foreground),
281    (UpdateChannelBufferCollaborators, Foreground),
282    (UpdateChannels, Foreground),
283    (UpdateUserChannels, Foreground),
284    (UpdateContacts, Foreground),
285    (UpdateDiagnosticSummary, Foreground),
286    (UpdateDiffBase, Foreground),
287    (UpdateFollowers, Foreground),
288    (UpdateInviteInfo, Foreground),
289    (UpdateLanguageServer, Foreground),
290    (UpdateParticipantLocation, Foreground),
291    (UpdateProject, Foreground),
292    (UpdateProjectCollaborator, Foreground),
293    (UpdateWorktree, Foreground),
294    (UpdateWorktreeSettings, Foreground),
295    (UsersResponse, Foreground),
296    (LspExtExpandMacro, Background),
297    (LspExtExpandMacroResponse, Background),
298    (SetRoomParticipantRole, Foreground),
299    (BlameBuffer, Foreground),
300    (BlameBufferResponse, Foreground),
301);
302
303request_messages!(
304    (ApplyCodeAction, ApplyCodeActionResponse),
305    (
306        ApplyCompletionAdditionalEdits,
307        ApplyCompletionAdditionalEditsResponse
308    ),
309    (Call, Ack),
310    (CancelCall, Ack),
311    (CopyProjectEntry, ProjectEntryResponse),
312    (CompleteWithLanguageModel, LanguageModelResponse),
313    (CountTokensWithLanguageModel, CountTokensResponse),
314    (CreateChannel, CreateChannelResponse),
315    (CreateProjectEntry, ProjectEntryResponse),
316    (CreateRoom, CreateRoomResponse),
317    (DeclineCall, Ack),
318    (DeleteChannel, Ack),
319    (DeleteProjectEntry, ProjectEntryResponse),
320    (ExpandProjectEntry, ExpandProjectEntryResponse),
321    (Follow, FollowResponse),
322    (FormatBuffers, FormatBuffersResponse),
323    (FuzzySearchUsers, UsersResponse),
324    (GetChannelMembers, GetChannelMembersResponse),
325    (GetChannelMessages, GetChannelMessagesResponse),
326    (GetChannelMessagesById, GetChannelMessagesResponse),
327    (GetCodeActions, GetCodeActionsResponse),
328    (GetCompletions, GetCompletionsResponse),
329    (GetDefinition, GetDefinitionResponse),
330    (GetImplementation, GetImplementationResponse),
331    (GetDocumentHighlights, GetDocumentHighlightsResponse),
332    (GetHover, GetHoverResponse),
333    (GetNotifications, GetNotificationsResponse),
334    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
335    (GetProjectSymbols, GetProjectSymbolsResponse),
336    (GetReferences, GetReferencesResponse),
337    (GetTypeDefinition, GetTypeDefinitionResponse),
338    (GetUsers, UsersResponse),
339    (IncomingCall, Ack),
340    (InlayHints, InlayHintsResponse),
341    (InviteChannelMember, Ack),
342    (JoinChannel, JoinRoomResponse),
343    (JoinChannelBuffer, JoinChannelBufferResponse),
344    (JoinChannelChat, JoinChannelChatResponse),
345    (JoinHostedProject, JoinProjectResponse),
346    (JoinProject, JoinProjectResponse),
347    (JoinRoom, JoinRoomResponse),
348    (LeaveChannelBuffer, Ack),
349    (LeaveRoom, Ack),
350    (MarkNotificationRead, Ack),
351    (MoveChannel, Ack),
352    (OnTypeFormatting, OnTypeFormattingResponse),
353    (OpenBufferById, OpenBufferResponse),
354    (OpenBufferByPath, OpenBufferResponse),
355    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
356    (PerformRename, PerformRenameResponse),
357    (Ping, Ack),
358    (PrepareRename, PrepareRenameResponse),
359    (RefreshInlayHints, Ack),
360    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
361    (RejoinRoom, RejoinRoomResponse),
362    (ReloadBuffers, ReloadBuffersResponse),
363    (RemoveChannelMember, Ack),
364    (RemoveChannelMessage, Ack),
365    (UpdateChannelMessage, Ack),
366    (RemoveContact, Ack),
367    (RenameChannel, RenameChannelResponse),
368    (RenameProjectEntry, ProjectEntryResponse),
369    (RequestContact, Ack),
370    (
371        ResolveCompletionDocumentation,
372        ResolveCompletionDocumentationResponse
373    ),
374    (ResolveInlayHint, ResolveInlayHintResponse),
375    (RespondToChannelInvite, Ack),
376    (RespondToContactRequest, Ack),
377    (SaveBuffer, BufferSaved),
378    (SearchProject, SearchProjectResponse),
379    (SendChannelMessage, SendChannelMessageResponse),
380    (SetChannelMemberRole, Ack),
381    (SetChannelVisibility, Ack),
382    (ShareProject, ShareProjectResponse),
383    (SynchronizeBuffers, SynchronizeBuffersResponse),
384    (Test, Test),
385    (UpdateBuffer, Ack),
386    (UpdateParticipantLocation, Ack),
387    (UpdateProject, Ack),
388    (UpdateWorktree, Ack),
389    (LspExtExpandMacro, LspExtExpandMacroResponse),
390    (SetRoomParticipantRole, Ack),
391    (BlameBuffer, BlameBufferResponse),
392);
393
394entity_messages!(
395    {project_id, ShareProject},
396    AddProjectCollaborator,
397    ApplyCodeAction,
398    ApplyCompletionAdditionalEdits,
399    BlameBuffer,
400    BufferReloaded,
401    BufferSaved,
402    CopyProjectEntry,
403    CreateBufferForPeer,
404    CreateProjectEntry,
405    DeleteProjectEntry,
406    ExpandProjectEntry,
407    FormatBuffers,
408    GetCodeActions,
409    GetCompletions,
410    GetDefinition,
411    GetImplementation,
412    GetDocumentHighlights,
413    GetHover,
414    GetProjectSymbols,
415    GetReferences,
416    GetTypeDefinition,
417    InlayHints,
418    JoinProject,
419    LeaveProject,
420    OnTypeFormatting,
421    OpenBufferById,
422    OpenBufferByPath,
423    OpenBufferForSymbol,
424    PerformRename,
425    PrepareRename,
426    RefreshInlayHints,
427    ReloadBuffers,
428    RemoveProjectCollaborator,
429    RenameProjectEntry,
430    ResolveCompletionDocumentation,
431    ResolveInlayHint,
432    SaveBuffer,
433    SearchProject,
434    StartLanguageServer,
435    SynchronizeBuffers,
436    UnshareProject,
437    UpdateBuffer,
438    UpdateBufferFile,
439    UpdateDiagnosticSummary,
440    UpdateDiffBase,
441    UpdateLanguageServer,
442    UpdateProject,
443    UpdateProjectCollaborator,
444    UpdateWorktree,
445    UpdateWorktreeSettings,
446    LspExtExpandMacro,
447);
448
449entity_messages!(
450    {channel_id, Channel},
451    ChannelMessageSent,
452    ChannelMessageUpdate,
453    RemoveChannelMessage,
454    UpdateChannelMessage,
455    UpdateChannelBuffer,
456    UpdateChannelBufferCollaborators,
457);
458
459const KIB: usize = 1024;
460const MIB: usize = KIB * 1024;
461const MAX_BUFFER_LEN: usize = MIB;
462
463/// A stream of protobuf messages.
464pub struct MessageStream<S> {
465    stream: S,
466    encoding_buffer: Vec<u8>,
467}
468
469#[allow(clippy::large_enum_variant)]
470#[derive(Debug)]
471pub enum Message {
472    Envelope(Envelope),
473    Ping,
474    Pong,
475}
476
477impl<S> MessageStream<S> {
478    pub fn new(stream: S) -> Self {
479        Self {
480            stream,
481            encoding_buffer: Vec::new(),
482        }
483    }
484
485    pub fn inner_mut(&mut self) -> &mut S {
486        &mut self.stream
487    }
488}
489
490impl<S> MessageStream<S>
491where
492    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
493{
494    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
495        #[cfg(any(test, feature = "test-support"))]
496        const COMPRESSION_LEVEL: i32 = -7;
497
498        #[cfg(not(any(test, feature = "test-support")))]
499        const COMPRESSION_LEVEL: i32 = 4;
500
501        match message {
502            Message::Envelope(message) => {
503                self.encoding_buffer.reserve(message.encoded_len());
504                message
505                    .encode(&mut self.encoding_buffer)
506                    .map_err(io::Error::from)?;
507                let buffer =
508                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
509                        .unwrap();
510
511                self.encoding_buffer.clear();
512                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
513                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
514            }
515            Message::Ping => {
516                self.stream
517                    .send(WebSocketMessage::Ping(Default::default()))
518                    .await?;
519            }
520            Message::Pong => {
521                self.stream
522                    .send(WebSocketMessage::Pong(Default::default()))
523                    .await?;
524            }
525        }
526
527        Ok(())
528    }
529}
530
531impl<S> MessageStream<S>
532where
533    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
534{
535    pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
536        while let Some(bytes) = self.stream.next().await {
537            let received_at = Instant::now();
538            match bytes? {
539                WebSocketMessage::Binary(bytes) => {
540                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
541                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
542                        .map_err(io::Error::from)?;
543
544                    self.encoding_buffer.clear();
545                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
546                    return Ok((Message::Envelope(envelope), received_at));
547                }
548                WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
549                WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
550                WebSocketMessage::Close(_) => break,
551                _ => {}
552            }
553        }
554        Err(anyhow!("connection closed"))
555    }
556}
557
558impl From<Timestamp> for SystemTime {
559    fn from(val: Timestamp) -> Self {
560        UNIX_EPOCH
561            .checked_add(Duration::new(val.seconds, val.nanos))
562            .unwrap()
563    }
564}
565
566impl From<SystemTime> for Timestamp {
567    fn from(time: SystemTime) -> Self {
568        let duration = time.duration_since(UNIX_EPOCH).unwrap();
569        Self {
570            seconds: duration.as_secs(),
571            nanos: duration.subsec_nanos(),
572        }
573    }
574}
575
576impl From<u128> for Nonce {
577    fn from(nonce: u128) -> Self {
578        let upper_half = (nonce >> 64) as u64;
579        let lower_half = nonce as u64;
580        Self {
581            upper_half,
582            lower_half,
583        }
584    }
585}
586
587impl From<Nonce> for u128 {
588    fn from(nonce: Nonce) -> Self {
589        let upper_half = (nonce.upper_half as u128) << 64;
590        let lower_half = nonce.lower_half as u128;
591        upper_half | lower_half
592    }
593}
594
595pub fn split_worktree_update(
596    mut message: UpdateWorktree,
597    max_chunk_size: usize,
598) -> impl Iterator<Item = UpdateWorktree> {
599    let mut done_files = false;
600
601    let mut repository_map = message
602        .updated_repositories
603        .into_iter()
604        .map(|repo| (repo.work_directory_id, repo))
605        .collect::<HashMap<_, _>>();
606
607    iter::from_fn(move || {
608        if done_files {
609            return None;
610        }
611
612        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
613        let updated_entries: Vec<_> = message
614            .updated_entries
615            .drain(..updated_entries_chunk_size)
616            .collect();
617
618        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
619        let removed_entries = message
620            .removed_entries
621            .drain(..removed_entries_chunk_size)
622            .collect();
623
624        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
625
626        let mut updated_repositories = Vec::new();
627
628        if !repository_map.is_empty() {
629            for entry in &updated_entries {
630                if let Some(repo) = repository_map.remove(&entry.id) {
631                    updated_repositories.push(repo)
632                }
633            }
634        }
635
636        let removed_repositories = if done_files {
637            mem::take(&mut message.removed_repositories)
638        } else {
639            Default::default()
640        };
641
642        if done_files {
643            updated_repositories.extend(mem::take(&mut repository_map).into_values());
644        }
645
646        Some(UpdateWorktree {
647            project_id: message.project_id,
648            worktree_id: message.worktree_id,
649            root_name: message.root_name.clone(),
650            abs_path: message.abs_path.clone(),
651            updated_entries,
652            removed_entries,
653            scan_id: message.scan_id,
654            is_last_update: done_files && message.is_last_update,
655            updated_repositories,
656            removed_repositories,
657        })
658    })
659}
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664
665    #[gpui::test]
666    async fn test_buffer_size() {
667        let (tx, rx) = futures::channel::mpsc::unbounded();
668        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
669        sink.write(Message::Envelope(Envelope {
670            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
671                root_name: "abcdefg".repeat(10),
672                ..Default::default()
673            })),
674            ..Default::default()
675        }))
676        .await
677        .unwrap();
678        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
679        sink.write(Message::Envelope(Envelope {
680            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
681                root_name: "abcdefg".repeat(1000000),
682                ..Default::default()
683            })),
684            ..Default::default()
685        }))
686        .await
687        .unwrap();
688        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
689
690        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
691        stream.read().await.unwrap();
692        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
693        stream.read().await.unwrap();
694        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
695    }
696
697    #[gpui::test]
698    fn test_converting_peer_id_from_and_to_u64() {
699        let peer_id = PeerId {
700            owner_id: 10,
701            id: 3,
702        };
703        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
704        let peer_id = PeerId {
705            owner_id: u32::MAX,
706            id: 3,
707        };
708        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
709        let peer_id = PeerId {
710            owner_id: 10,
711            id: u32::MAX,
712        };
713        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
714        let peer_id = PeerId {
715            owner_id: u32::MAX,
716            id: u32::MAX,
717        };
718        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
719    }
720}