1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (CopyProjectEntry, Foreground),
155 (CountTokensWithLanguageModel, Background),
156 (CountTokensResponse, Background),
157 (CreateBufferForPeer, Foreground),
158 (CreateChannel, Foreground),
159 (CreateChannelResponse, Foreground),
160 (CreateProjectEntry, Foreground),
161 (CreateRoom, Foreground),
162 (CreateRoomResponse, Foreground),
163 (DeclineCall, Foreground),
164 (DeleteChannel, Foreground),
165 (DeleteNotification, Foreground),
166 (UpdateNotification, Foreground),
167 (DeleteProjectEntry, Foreground),
168 (EndStream, Foreground),
169 (Error, Foreground),
170 (ExpandProjectEntry, Foreground),
171 (ExpandProjectEntryResponse, Foreground),
172 (Follow, Foreground),
173 (FollowResponse, Foreground),
174 (FormatBuffers, Foreground),
175 (FormatBuffersResponse, Foreground),
176 (FuzzySearchUsers, Foreground),
177 (GetChannelMembers, Foreground),
178 (GetChannelMembersResponse, Foreground),
179 (GetChannelMessages, Background),
180 (GetChannelMessagesById, Background),
181 (GetChannelMessagesResponse, Background),
182 (GetCodeActions, Background),
183 (GetCodeActionsResponse, Background),
184 (GetCompletions, Background),
185 (GetCompletionsResponse, Background),
186 (GetDefinition, Background),
187 (GetDefinitionResponse, Background),
188 (GetDocumentHighlights, Background),
189 (GetDocumentHighlightsResponse, Background),
190 (GetHover, Background),
191 (GetHoverResponse, Background),
192 (GetNotifications, Foreground),
193 (GetNotificationsResponse, Foreground),
194 (GetPrivateUserInfo, Foreground),
195 (GetPrivateUserInfoResponse, Foreground),
196 (GetProjectSymbols, Background),
197 (GetProjectSymbolsResponse, Background),
198 (GetReferences, Background),
199 (GetReferencesResponse, Background),
200 (GetTypeDefinition, Background),
201 (GetTypeDefinitionResponse, Background),
202 (GetImplementation, Background),
203 (GetImplementationResponse, Background),
204 (GetUsers, Foreground),
205 (Hello, Foreground),
206 (IncomingCall, Foreground),
207 (InlayHints, Background),
208 (InlayHintsResponse, Background),
209 (InviteChannelMember, Foreground),
210 (JoinChannel, Foreground),
211 (JoinChannelBuffer, Foreground),
212 (JoinChannelBufferResponse, Foreground),
213 (JoinChannelChat, Foreground),
214 (JoinChannelChatResponse, Foreground),
215 (JoinProject, Foreground),
216 (JoinHostedProject, Foreground),
217 (JoinProjectResponse, Foreground),
218 (JoinRoom, Foreground),
219 (JoinRoomResponse, Foreground),
220 (LanguageModelResponse, Background),
221 (LeaveChannelBuffer, Background),
222 (LeaveChannelChat, Foreground),
223 (LeaveProject, Foreground),
224 (LeaveRoom, Foreground),
225 (MarkNotificationRead, Foreground),
226 (MoveChannel, Foreground),
227 (OnTypeFormatting, Background),
228 (OnTypeFormattingResponse, Background),
229 (OpenBufferById, Background),
230 (OpenBufferByPath, Background),
231 (OpenBufferForSymbol, Background),
232 (OpenBufferForSymbolResponse, Background),
233 (OpenBufferResponse, Background),
234 (PerformRename, Background),
235 (PerformRenameResponse, Background),
236 (Ping, Foreground),
237 (PrepareRename, Background),
238 (PrepareRenameResponse, Background),
239 (ProjectEntryResponse, Foreground),
240 (RefreshInlayHints, Foreground),
241 (RejoinChannelBuffers, Foreground),
242 (RejoinChannelBuffersResponse, Foreground),
243 (RejoinRoom, Foreground),
244 (RejoinRoomResponse, Foreground),
245 (ReloadBuffers, Foreground),
246 (ReloadBuffersResponse, Foreground),
247 (RemoveChannelMember, Foreground),
248 (RemoveChannelMessage, Foreground),
249 (UpdateChannelMessage, Foreground),
250 (RemoveContact, Foreground),
251 (RemoveProjectCollaborator, Foreground),
252 (RenameChannel, Foreground),
253 (RenameChannelResponse, Foreground),
254 (RenameProjectEntry, Foreground),
255 (RequestContact, Foreground),
256 (ResolveCompletionDocumentation, Background),
257 (ResolveCompletionDocumentationResponse, Background),
258 (ResolveInlayHint, Background),
259 (ResolveInlayHintResponse, Background),
260 (RespondToChannelInvite, Foreground),
261 (RespondToContactRequest, Foreground),
262 (RoomUpdated, Foreground),
263 (SaveBuffer, Foreground),
264 (SetChannelMemberRole, Foreground),
265 (SetChannelVisibility, Foreground),
266 (SearchProject, Background),
267 (SearchProjectResponse, Background),
268 (SendChannelMessage, Background),
269 (SendChannelMessageResponse, Background),
270 (ShareProject, Foreground),
271 (ShareProjectResponse, Foreground),
272 (ShowContacts, Foreground),
273 (StartLanguageServer, Foreground),
274 (SynchronizeBuffers, Foreground),
275 (SynchronizeBuffersResponse, Foreground),
276 (Test, Foreground),
277 (Unfollow, Foreground),
278 (UnshareProject, Foreground),
279 (UpdateBuffer, Foreground),
280 (UpdateBufferFile, Foreground),
281 (UpdateChannelBuffer, Foreground),
282 (UpdateChannelBufferCollaborators, Foreground),
283 (UpdateChannels, Foreground),
284 (UpdateUserChannels, Foreground),
285 (UpdateContacts, Foreground),
286 (UpdateDiagnosticSummary, Foreground),
287 (UpdateDiffBase, Foreground),
288 (UpdateFollowers, Foreground),
289 (UpdateInviteInfo, Foreground),
290 (UpdateLanguageServer, Foreground),
291 (UpdateParticipantLocation, Foreground),
292 (UpdateProject, Foreground),
293 (UpdateProjectCollaborator, Foreground),
294 (UpdateWorktree, Foreground),
295 (UpdateWorktreeSettings, Foreground),
296 (UsersResponse, Foreground),
297 (LspExtExpandMacro, Background),
298 (LspExtExpandMacroResponse, Background),
299 (SetRoomParticipantRole, Foreground),
300 (BlameBuffer, Foreground),
301 (BlameBufferResponse, Foreground),
302 (MultiLspQuery, Background),
303 (MultiLspQueryResponse, Background),
304);
305
306request_messages!(
307 (ApplyCodeAction, ApplyCodeActionResponse),
308 (
309 ApplyCompletionAdditionalEdits,
310 ApplyCompletionAdditionalEditsResponse
311 ),
312 (Call, Ack),
313 (CancelCall, Ack),
314 (CopyProjectEntry, ProjectEntryResponse),
315 (CompleteWithLanguageModel, LanguageModelResponse),
316 (CountTokensWithLanguageModel, CountTokensResponse),
317 (CreateChannel, CreateChannelResponse),
318 (CreateProjectEntry, ProjectEntryResponse),
319 (CreateRoom, CreateRoomResponse),
320 (DeclineCall, Ack),
321 (DeleteChannel, Ack),
322 (DeleteProjectEntry, ProjectEntryResponse),
323 (ExpandProjectEntry, ExpandProjectEntryResponse),
324 (Follow, FollowResponse),
325 (FormatBuffers, FormatBuffersResponse),
326 (FuzzySearchUsers, UsersResponse),
327 (GetChannelMembers, GetChannelMembersResponse),
328 (GetChannelMessages, GetChannelMessagesResponse),
329 (GetChannelMessagesById, GetChannelMessagesResponse),
330 (GetCodeActions, GetCodeActionsResponse),
331 (GetCompletions, GetCompletionsResponse),
332 (GetDefinition, GetDefinitionResponse),
333 (GetImplementation, GetImplementationResponse),
334 (GetDocumentHighlights, GetDocumentHighlightsResponse),
335 (GetHover, GetHoverResponse),
336 (GetNotifications, GetNotificationsResponse),
337 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
338 (GetProjectSymbols, GetProjectSymbolsResponse),
339 (GetReferences, GetReferencesResponse),
340 (GetTypeDefinition, GetTypeDefinitionResponse),
341 (GetUsers, UsersResponse),
342 (IncomingCall, Ack),
343 (InlayHints, InlayHintsResponse),
344 (InviteChannelMember, Ack),
345 (JoinChannel, JoinRoomResponse),
346 (JoinChannelBuffer, JoinChannelBufferResponse),
347 (JoinChannelChat, JoinChannelChatResponse),
348 (JoinHostedProject, JoinProjectResponse),
349 (JoinProject, JoinProjectResponse),
350 (JoinRoom, JoinRoomResponse),
351 (LeaveChannelBuffer, Ack),
352 (LeaveRoom, Ack),
353 (MarkNotificationRead, Ack),
354 (MoveChannel, Ack),
355 (OnTypeFormatting, OnTypeFormattingResponse),
356 (OpenBufferById, OpenBufferResponse),
357 (OpenBufferByPath, OpenBufferResponse),
358 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
359 (PerformRename, PerformRenameResponse),
360 (Ping, Ack),
361 (PrepareRename, PrepareRenameResponse),
362 (RefreshInlayHints, Ack),
363 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
364 (RejoinRoom, RejoinRoomResponse),
365 (ReloadBuffers, ReloadBuffersResponse),
366 (RemoveChannelMember, Ack),
367 (RemoveChannelMessage, Ack),
368 (UpdateChannelMessage, Ack),
369 (RemoveContact, Ack),
370 (RenameChannel, RenameChannelResponse),
371 (RenameProjectEntry, ProjectEntryResponse),
372 (RequestContact, Ack),
373 (
374 ResolveCompletionDocumentation,
375 ResolveCompletionDocumentationResponse
376 ),
377 (ResolveInlayHint, ResolveInlayHintResponse),
378 (RespondToChannelInvite, Ack),
379 (RespondToContactRequest, Ack),
380 (SaveBuffer, BufferSaved),
381 (SearchProject, SearchProjectResponse),
382 (SendChannelMessage, SendChannelMessageResponse),
383 (SetChannelMemberRole, Ack),
384 (SetChannelVisibility, Ack),
385 (ShareProject, ShareProjectResponse),
386 (SynchronizeBuffers, SynchronizeBuffersResponse),
387 (Test, Test),
388 (UpdateBuffer, Ack),
389 (UpdateParticipantLocation, Ack),
390 (UpdateProject, Ack),
391 (UpdateWorktree, Ack),
392 (LspExtExpandMacro, LspExtExpandMacroResponse),
393 (SetRoomParticipantRole, Ack),
394 (BlameBuffer, BlameBufferResponse),
395 (MultiLspQuery, MultiLspQueryResponse),
396);
397
398entity_messages!(
399 {project_id, ShareProject},
400 AddProjectCollaborator,
401 ApplyCodeAction,
402 ApplyCompletionAdditionalEdits,
403 BlameBuffer,
404 BufferReloaded,
405 BufferSaved,
406 CopyProjectEntry,
407 CreateBufferForPeer,
408 CreateProjectEntry,
409 DeleteProjectEntry,
410 ExpandProjectEntry,
411 FormatBuffers,
412 GetCodeActions,
413 GetCompletions,
414 GetDefinition,
415 GetImplementation,
416 GetDocumentHighlights,
417 GetHover,
418 GetProjectSymbols,
419 GetReferences,
420 GetTypeDefinition,
421 InlayHints,
422 JoinProject,
423 LeaveProject,
424 MultiLspQuery,
425 OnTypeFormatting,
426 OpenBufferById,
427 OpenBufferByPath,
428 OpenBufferForSymbol,
429 PerformRename,
430 PrepareRename,
431 RefreshInlayHints,
432 ReloadBuffers,
433 RemoveProjectCollaborator,
434 RenameProjectEntry,
435 ResolveCompletionDocumentation,
436 ResolveInlayHint,
437 SaveBuffer,
438 SearchProject,
439 StartLanguageServer,
440 SynchronizeBuffers,
441 UnshareProject,
442 UpdateBuffer,
443 UpdateBufferFile,
444 UpdateDiagnosticSummary,
445 UpdateDiffBase,
446 UpdateLanguageServer,
447 UpdateProject,
448 UpdateProjectCollaborator,
449 UpdateWorktree,
450 UpdateWorktreeSettings,
451 LspExtExpandMacro,
452);
453
454entity_messages!(
455 {channel_id, Channel},
456 ChannelMessageSent,
457 ChannelMessageUpdate,
458 RemoveChannelMessage,
459 UpdateChannelMessage,
460 UpdateChannelBuffer,
461 UpdateChannelBufferCollaborators,
462);
463
464const KIB: usize = 1024;
465const MIB: usize = KIB * 1024;
466const MAX_BUFFER_LEN: usize = MIB;
467
468/// A stream of protobuf messages.
469pub struct MessageStream<S> {
470 stream: S,
471 encoding_buffer: Vec<u8>,
472}
473
474#[allow(clippy::large_enum_variant)]
475#[derive(Debug)]
476pub enum Message {
477 Envelope(Envelope),
478 Ping,
479 Pong,
480}
481
482impl<S> MessageStream<S> {
483 pub fn new(stream: S) -> Self {
484 Self {
485 stream,
486 encoding_buffer: Vec::new(),
487 }
488 }
489
490 pub fn inner_mut(&mut self) -> &mut S {
491 &mut self.stream
492 }
493}
494
495impl<S> MessageStream<S>
496where
497 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
498{
499 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
500 #[cfg(any(test, feature = "test-support"))]
501 const COMPRESSION_LEVEL: i32 = -7;
502
503 #[cfg(not(any(test, feature = "test-support")))]
504 const COMPRESSION_LEVEL: i32 = 4;
505
506 match message {
507 Message::Envelope(message) => {
508 self.encoding_buffer.reserve(message.encoded_len());
509 message
510 .encode(&mut self.encoding_buffer)
511 .map_err(io::Error::from)?;
512 let buffer =
513 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
514 .unwrap();
515
516 self.encoding_buffer.clear();
517 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
518 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
519 }
520 Message::Ping => {
521 self.stream
522 .send(WebSocketMessage::Ping(Default::default()))
523 .await?;
524 }
525 Message::Pong => {
526 self.stream
527 .send(WebSocketMessage::Pong(Default::default()))
528 .await?;
529 }
530 }
531
532 Ok(())
533 }
534}
535
536impl<S> MessageStream<S>
537where
538 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
539{
540 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
541 while let Some(bytes) = self.stream.next().await {
542 let received_at = Instant::now();
543 match bytes? {
544 WebSocketMessage::Binary(bytes) => {
545 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
546 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
547 .map_err(io::Error::from)?;
548
549 self.encoding_buffer.clear();
550 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
551 return Ok((Message::Envelope(envelope), received_at));
552 }
553 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
554 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
555 WebSocketMessage::Close(_) => break,
556 _ => {}
557 }
558 }
559 Err(anyhow!("connection closed"))
560 }
561}
562
563impl From<Timestamp> for SystemTime {
564 fn from(val: Timestamp) -> Self {
565 UNIX_EPOCH
566 .checked_add(Duration::new(val.seconds, val.nanos))
567 .unwrap()
568 }
569}
570
571impl From<SystemTime> for Timestamp {
572 fn from(time: SystemTime) -> Self {
573 let duration = time.duration_since(UNIX_EPOCH).unwrap();
574 Self {
575 seconds: duration.as_secs(),
576 nanos: duration.subsec_nanos(),
577 }
578 }
579}
580
581impl From<u128> for Nonce {
582 fn from(nonce: u128) -> Self {
583 let upper_half = (nonce >> 64) as u64;
584 let lower_half = nonce as u64;
585 Self {
586 upper_half,
587 lower_half,
588 }
589 }
590}
591
592impl From<Nonce> for u128 {
593 fn from(nonce: Nonce) -> Self {
594 let upper_half = (nonce.upper_half as u128) << 64;
595 let lower_half = nonce.lower_half as u128;
596 upper_half | lower_half
597 }
598}
599
600pub fn split_worktree_update(
601 mut message: UpdateWorktree,
602 max_chunk_size: usize,
603) -> impl Iterator<Item = UpdateWorktree> {
604 let mut done_files = false;
605
606 let mut repository_map = message
607 .updated_repositories
608 .into_iter()
609 .map(|repo| (repo.work_directory_id, repo))
610 .collect::<HashMap<_, _>>();
611
612 iter::from_fn(move || {
613 if done_files {
614 return None;
615 }
616
617 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
618 let updated_entries: Vec<_> = message
619 .updated_entries
620 .drain(..updated_entries_chunk_size)
621 .collect();
622
623 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
624 let removed_entries = message
625 .removed_entries
626 .drain(..removed_entries_chunk_size)
627 .collect();
628
629 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
630
631 let mut updated_repositories = Vec::new();
632
633 if !repository_map.is_empty() {
634 for entry in &updated_entries {
635 if let Some(repo) = repository_map.remove(&entry.id) {
636 updated_repositories.push(repo)
637 }
638 }
639 }
640
641 let removed_repositories = if done_files {
642 mem::take(&mut message.removed_repositories)
643 } else {
644 Default::default()
645 };
646
647 if done_files {
648 updated_repositories.extend(mem::take(&mut repository_map).into_values());
649 }
650
651 Some(UpdateWorktree {
652 project_id: message.project_id,
653 worktree_id: message.worktree_id,
654 root_name: message.root_name.clone(),
655 abs_path: message.abs_path.clone(),
656 updated_entries,
657 removed_entries,
658 scan_id: message.scan_id,
659 is_last_update: done_files && message.is_last_update,
660 updated_repositories,
661 removed_repositories,
662 })
663 })
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669
670 #[gpui::test]
671 async fn test_buffer_size() {
672 let (tx, rx) = futures::channel::mpsc::unbounded();
673 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
674 sink.write(Message::Envelope(Envelope {
675 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
676 root_name: "abcdefg".repeat(10),
677 ..Default::default()
678 })),
679 ..Default::default()
680 }))
681 .await
682 .unwrap();
683 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
684 sink.write(Message::Envelope(Envelope {
685 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
686 root_name: "abcdefg".repeat(1000000),
687 ..Default::default()
688 })),
689 ..Default::default()
690 }))
691 .await
692 .unwrap();
693 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
694
695 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
696 stream.read().await.unwrap();
697 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
698 stream.read().await.unwrap();
699 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
700 }
701
702 #[gpui::test]
703 fn test_converting_peer_id_from_and_to_u64() {
704 let peer_id = PeerId {
705 owner_id: 10,
706 id: 3,
707 };
708 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
709 let peer_id = PeerId {
710 owner_id: u32::MAX,
711 id: 3,
712 };
713 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
714 let peer_id = PeerId {
715 owner_id: 10,
716 id: u32::MAX,
717 };
718 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
719 let peer_id = PeerId {
720 owner_id: u32::MAX,
721 id: u32::MAX,
722 };
723 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
724 }
725}