1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (CopyProjectEntry, Foreground),
155 (CountTokensWithLanguageModel, Background),
156 (CountTokensResponse, Background),
157 (CreateBufferForPeer, Foreground),
158 (CreateChannel, Foreground),
159 (CreateChannelResponse, Foreground),
160 (CreateProjectEntry, Foreground),
161 (CreateRoom, Foreground),
162 (CreateRoomResponse, Foreground),
163 (DeclineCall, Foreground),
164 (DeleteChannel, Foreground),
165 (DeleteNotification, Foreground),
166 (DeleteProjectEntry, Foreground),
167 (EndStream, Foreground),
168 (Error, Foreground),
169 (ExpandProjectEntry, Foreground),
170 (ExpandProjectEntryResponse, Foreground),
171 (Follow, Foreground),
172 (FollowResponse, Foreground),
173 (FormatBuffers, Foreground),
174 (FormatBuffersResponse, Foreground),
175 (FuzzySearchUsers, Foreground),
176 (GetChannelMembers, Foreground),
177 (GetChannelMembersResponse, Foreground),
178 (GetChannelMessages, Background),
179 (GetChannelMessagesById, Background),
180 (GetChannelMessagesResponse, Background),
181 (GetCodeActions, Background),
182 (GetCodeActionsResponse, Background),
183 (GetCompletions, Background),
184 (GetCompletionsResponse, Background),
185 (GetDefinition, Background),
186 (GetDefinitionResponse, Background),
187 (GetDocumentHighlights, Background),
188 (GetDocumentHighlightsResponse, Background),
189 (GetHover, Background),
190 (GetHoverResponse, Background),
191 (GetNotifications, Foreground),
192 (GetNotificationsResponse, Foreground),
193 (GetPrivateUserInfo, Foreground),
194 (GetPrivateUserInfoResponse, Foreground),
195 (GetProjectSymbols, Background),
196 (GetProjectSymbolsResponse, Background),
197 (GetReferences, Background),
198 (GetReferencesResponse, Background),
199 (GetTypeDefinition, Background),
200 (GetTypeDefinitionResponse, Background),
201 (GetImplementation, Background),
202 (GetImplementationResponse, Background),
203 (GetUsers, Foreground),
204 (Hello, Foreground),
205 (IncomingCall, Foreground),
206 (InlayHints, Background),
207 (InlayHintsResponse, Background),
208 (InviteChannelMember, Foreground),
209 (JoinChannel, Foreground),
210 (JoinChannelBuffer, Foreground),
211 (JoinChannelBufferResponse, Foreground),
212 (JoinChannelChat, Foreground),
213 (JoinChannelChatResponse, Foreground),
214 (JoinProject, Foreground),
215 (JoinHostedProject, Foreground),
216 (JoinProjectResponse, Foreground),
217 (JoinRoom, Foreground),
218 (JoinRoomResponse, Foreground),
219 (LanguageModelResponse, Background),
220 (LeaveChannelBuffer, Background),
221 (LeaveChannelChat, Foreground),
222 (LeaveProject, Foreground),
223 (LeaveRoom, Foreground),
224 (MarkNotificationRead, Foreground),
225 (MoveChannel, Foreground),
226 (OnTypeFormatting, Background),
227 (OnTypeFormattingResponse, Background),
228 (OpenBufferById, Background),
229 (OpenBufferByPath, Background),
230 (OpenBufferForSymbol, Background),
231 (OpenBufferForSymbolResponse, Background),
232 (OpenBufferResponse, Background),
233 (PerformRename, Background),
234 (PerformRenameResponse, Background),
235 (Ping, Foreground),
236 (PrepareRename, Background),
237 (PrepareRenameResponse, Background),
238 (ProjectEntryResponse, Foreground),
239 (RefreshInlayHints, Foreground),
240 (RejoinChannelBuffers, Foreground),
241 (RejoinChannelBuffersResponse, Foreground),
242 (RejoinRoom, Foreground),
243 (RejoinRoomResponse, Foreground),
244 (ReloadBuffers, Foreground),
245 (ReloadBuffersResponse, Foreground),
246 (RemoveChannelMember, Foreground),
247 (RemoveChannelMessage, Foreground),
248 (UpdateChannelMessage, Foreground),
249 (RemoveContact, Foreground),
250 (RemoveProjectCollaborator, Foreground),
251 (RenameChannel, Foreground),
252 (RenameChannelResponse, Foreground),
253 (RenameProjectEntry, Foreground),
254 (RequestContact, Foreground),
255 (ResolveCompletionDocumentation, Background),
256 (ResolveCompletionDocumentationResponse, Background),
257 (ResolveInlayHint, Background),
258 (ResolveInlayHintResponse, Background),
259 (RespondToChannelInvite, Foreground),
260 (RespondToContactRequest, Foreground),
261 (RoomUpdated, Foreground),
262 (SaveBuffer, Foreground),
263 (SetChannelMemberRole, Foreground),
264 (SetChannelVisibility, Foreground),
265 (SearchProject, Background),
266 (SearchProjectResponse, Background),
267 (SendChannelMessage, Background),
268 (SendChannelMessageResponse, Background),
269 (ShareProject, Foreground),
270 (ShareProjectResponse, Foreground),
271 (ShowContacts, Foreground),
272 (StartLanguageServer, Foreground),
273 (SynchronizeBuffers, Foreground),
274 (SynchronizeBuffersResponse, Foreground),
275 (Test, Foreground),
276 (Unfollow, Foreground),
277 (UnshareProject, Foreground),
278 (UpdateBuffer, Foreground),
279 (UpdateBufferFile, Foreground),
280 (UpdateChannelBuffer, Foreground),
281 (UpdateChannelBufferCollaborators, Foreground),
282 (UpdateChannels, Foreground),
283 (UpdateUserChannels, Foreground),
284 (UpdateContacts, Foreground),
285 (UpdateDiagnosticSummary, Foreground),
286 (UpdateDiffBase, Foreground),
287 (UpdateFollowers, Foreground),
288 (UpdateInviteInfo, Foreground),
289 (UpdateLanguageServer, Foreground),
290 (UpdateParticipantLocation, Foreground),
291 (UpdateProject, Foreground),
292 (UpdateProjectCollaborator, Foreground),
293 (UpdateWorktree, Foreground),
294 (UpdateWorktreeSettings, Foreground),
295 (UsersResponse, Foreground),
296 (LspExtExpandMacro, Background),
297 (LspExtExpandMacroResponse, Background),
298 (SetRoomParticipantRole, Foreground),
299 (BlameBuffer, Foreground),
300 (BlameBufferResponse, Foreground),
301);
302
303request_messages!(
304 (ApplyCodeAction, ApplyCodeActionResponse),
305 (
306 ApplyCompletionAdditionalEdits,
307 ApplyCompletionAdditionalEditsResponse
308 ),
309 (Call, Ack),
310 (CancelCall, Ack),
311 (CopyProjectEntry, ProjectEntryResponse),
312 (CompleteWithLanguageModel, LanguageModelResponse),
313 (CountTokensWithLanguageModel, CountTokensResponse),
314 (CreateChannel, CreateChannelResponse),
315 (CreateProjectEntry, ProjectEntryResponse),
316 (CreateRoom, CreateRoomResponse),
317 (DeclineCall, Ack),
318 (DeleteChannel, Ack),
319 (DeleteProjectEntry, ProjectEntryResponse),
320 (ExpandProjectEntry, ExpandProjectEntryResponse),
321 (Follow, FollowResponse),
322 (FormatBuffers, FormatBuffersResponse),
323 (FuzzySearchUsers, UsersResponse),
324 (GetChannelMembers, GetChannelMembersResponse),
325 (GetChannelMessages, GetChannelMessagesResponse),
326 (GetChannelMessagesById, GetChannelMessagesResponse),
327 (GetCodeActions, GetCodeActionsResponse),
328 (GetCompletions, GetCompletionsResponse),
329 (GetDefinition, GetDefinitionResponse),
330 (GetImplementation, GetImplementationResponse),
331 (GetDocumentHighlights, GetDocumentHighlightsResponse),
332 (GetHover, GetHoverResponse),
333 (GetNotifications, GetNotificationsResponse),
334 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
335 (GetProjectSymbols, GetProjectSymbolsResponse),
336 (GetReferences, GetReferencesResponse),
337 (GetTypeDefinition, GetTypeDefinitionResponse),
338 (GetUsers, UsersResponse),
339 (IncomingCall, Ack),
340 (InlayHints, InlayHintsResponse),
341 (InviteChannelMember, Ack),
342 (JoinChannel, JoinRoomResponse),
343 (JoinChannelBuffer, JoinChannelBufferResponse),
344 (JoinChannelChat, JoinChannelChatResponse),
345 (JoinHostedProject, JoinProjectResponse),
346 (JoinProject, JoinProjectResponse),
347 (JoinRoom, JoinRoomResponse),
348 (LeaveChannelBuffer, Ack),
349 (LeaveRoom, Ack),
350 (MarkNotificationRead, Ack),
351 (MoveChannel, Ack),
352 (OnTypeFormatting, OnTypeFormattingResponse),
353 (OpenBufferById, OpenBufferResponse),
354 (OpenBufferByPath, OpenBufferResponse),
355 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
356 (PerformRename, PerformRenameResponse),
357 (Ping, Ack),
358 (PrepareRename, PrepareRenameResponse),
359 (RefreshInlayHints, Ack),
360 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
361 (RejoinRoom, RejoinRoomResponse),
362 (ReloadBuffers, ReloadBuffersResponse),
363 (RemoveChannelMember, Ack),
364 (RemoveChannelMessage, Ack),
365 (UpdateChannelMessage, Ack),
366 (RemoveContact, Ack),
367 (RenameChannel, RenameChannelResponse),
368 (RenameProjectEntry, ProjectEntryResponse),
369 (RequestContact, Ack),
370 (
371 ResolveCompletionDocumentation,
372 ResolveCompletionDocumentationResponse
373 ),
374 (ResolveInlayHint, ResolveInlayHintResponse),
375 (RespondToChannelInvite, Ack),
376 (RespondToContactRequest, Ack),
377 (SaveBuffer, BufferSaved),
378 (SearchProject, SearchProjectResponse),
379 (SendChannelMessage, SendChannelMessageResponse),
380 (SetChannelMemberRole, Ack),
381 (SetChannelVisibility, Ack),
382 (ShareProject, ShareProjectResponse),
383 (SynchronizeBuffers, SynchronizeBuffersResponse),
384 (Test, Test),
385 (UpdateBuffer, Ack),
386 (UpdateParticipantLocation, Ack),
387 (UpdateProject, Ack),
388 (UpdateWorktree, Ack),
389 (LspExtExpandMacro, LspExtExpandMacroResponse),
390 (SetRoomParticipantRole, Ack),
391 (BlameBuffer, BlameBufferResponse),
392);
393
394entity_messages!(
395 {project_id, ShareProject},
396 AddProjectCollaborator,
397 ApplyCodeAction,
398 ApplyCompletionAdditionalEdits,
399 BlameBuffer,
400 BufferReloaded,
401 BufferSaved,
402 CopyProjectEntry,
403 CreateBufferForPeer,
404 CreateProjectEntry,
405 DeleteProjectEntry,
406 ExpandProjectEntry,
407 FormatBuffers,
408 GetCodeActions,
409 GetCompletions,
410 GetDefinition,
411 GetImplementation,
412 GetDocumentHighlights,
413 GetHover,
414 GetProjectSymbols,
415 GetReferences,
416 GetTypeDefinition,
417 InlayHints,
418 JoinProject,
419 LeaveProject,
420 OnTypeFormatting,
421 OpenBufferById,
422 OpenBufferByPath,
423 OpenBufferForSymbol,
424 PerformRename,
425 PrepareRename,
426 RefreshInlayHints,
427 ReloadBuffers,
428 RemoveProjectCollaborator,
429 RenameProjectEntry,
430 ResolveCompletionDocumentation,
431 ResolveInlayHint,
432 SaveBuffer,
433 SearchProject,
434 StartLanguageServer,
435 SynchronizeBuffers,
436 UnshareProject,
437 UpdateBuffer,
438 UpdateBufferFile,
439 UpdateDiagnosticSummary,
440 UpdateDiffBase,
441 UpdateLanguageServer,
442 UpdateProject,
443 UpdateProjectCollaborator,
444 UpdateWorktree,
445 UpdateWorktreeSettings,
446 LspExtExpandMacro,
447);
448
449entity_messages!(
450 {channel_id, Channel},
451 ChannelMessageSent,
452 ChannelMessageUpdate,
453 RemoveChannelMessage,
454 UpdateChannelMessage,
455 UpdateChannelBuffer,
456 UpdateChannelBufferCollaborators,
457);
458
459const KIB: usize = 1024;
460const MIB: usize = KIB * 1024;
461const MAX_BUFFER_LEN: usize = MIB;
462
463/// A stream of protobuf messages.
464pub struct MessageStream<S> {
465 stream: S,
466 encoding_buffer: Vec<u8>,
467}
468
469#[allow(clippy::large_enum_variant)]
470#[derive(Debug)]
471pub enum Message {
472 Envelope(Envelope),
473 Ping,
474 Pong,
475}
476
477impl<S> MessageStream<S> {
478 pub fn new(stream: S) -> Self {
479 Self {
480 stream,
481 encoding_buffer: Vec::new(),
482 }
483 }
484
485 pub fn inner_mut(&mut self) -> &mut S {
486 &mut self.stream
487 }
488}
489
490impl<S> MessageStream<S>
491where
492 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
493{
494 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
495 #[cfg(any(test, feature = "test-support"))]
496 const COMPRESSION_LEVEL: i32 = -7;
497
498 #[cfg(not(any(test, feature = "test-support")))]
499 const COMPRESSION_LEVEL: i32 = 4;
500
501 match message {
502 Message::Envelope(message) => {
503 self.encoding_buffer.reserve(message.encoded_len());
504 message
505 .encode(&mut self.encoding_buffer)
506 .map_err(io::Error::from)?;
507 let buffer =
508 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
509 .unwrap();
510
511 self.encoding_buffer.clear();
512 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
513 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
514 }
515 Message::Ping => {
516 self.stream
517 .send(WebSocketMessage::Ping(Default::default()))
518 .await?;
519 }
520 Message::Pong => {
521 self.stream
522 .send(WebSocketMessage::Pong(Default::default()))
523 .await?;
524 }
525 }
526
527 Ok(())
528 }
529}
530
531impl<S> MessageStream<S>
532where
533 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
534{
535 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
536 while let Some(bytes) = self.stream.next().await {
537 let received_at = Instant::now();
538 match bytes? {
539 WebSocketMessage::Binary(bytes) => {
540 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
541 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
542 .map_err(io::Error::from)?;
543
544 self.encoding_buffer.clear();
545 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
546 return Ok((Message::Envelope(envelope), received_at));
547 }
548 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
549 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
550 WebSocketMessage::Close(_) => break,
551 _ => {}
552 }
553 }
554 Err(anyhow!("connection closed"))
555 }
556}
557
558impl From<Timestamp> for SystemTime {
559 fn from(val: Timestamp) -> Self {
560 UNIX_EPOCH
561 .checked_add(Duration::new(val.seconds, val.nanos))
562 .unwrap()
563 }
564}
565
566impl From<SystemTime> for Timestamp {
567 fn from(time: SystemTime) -> Self {
568 let duration = time.duration_since(UNIX_EPOCH).unwrap();
569 Self {
570 seconds: duration.as_secs(),
571 nanos: duration.subsec_nanos(),
572 }
573 }
574}
575
576impl From<u128> for Nonce {
577 fn from(nonce: u128) -> Self {
578 let upper_half = (nonce >> 64) as u64;
579 let lower_half = nonce as u64;
580 Self {
581 upper_half,
582 lower_half,
583 }
584 }
585}
586
587impl From<Nonce> for u128 {
588 fn from(nonce: Nonce) -> Self {
589 let upper_half = (nonce.upper_half as u128) << 64;
590 let lower_half = nonce.lower_half as u128;
591 upper_half | lower_half
592 }
593}
594
595pub fn split_worktree_update(
596 mut message: UpdateWorktree,
597 max_chunk_size: usize,
598) -> impl Iterator<Item = UpdateWorktree> {
599 let mut done_files = false;
600
601 let mut repository_map = message
602 .updated_repositories
603 .into_iter()
604 .map(|repo| (repo.work_directory_id, repo))
605 .collect::<HashMap<_, _>>();
606
607 iter::from_fn(move || {
608 if done_files {
609 return None;
610 }
611
612 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
613 let updated_entries: Vec<_> = message
614 .updated_entries
615 .drain(..updated_entries_chunk_size)
616 .collect();
617
618 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
619 let removed_entries = message
620 .removed_entries
621 .drain(..removed_entries_chunk_size)
622 .collect();
623
624 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
625
626 let mut updated_repositories = Vec::new();
627
628 if !repository_map.is_empty() {
629 for entry in &updated_entries {
630 if let Some(repo) = repository_map.remove(&entry.id) {
631 updated_repositories.push(repo)
632 }
633 }
634 }
635
636 let removed_repositories = if done_files {
637 mem::take(&mut message.removed_repositories)
638 } else {
639 Default::default()
640 };
641
642 if done_files {
643 updated_repositories.extend(mem::take(&mut repository_map).into_values());
644 }
645
646 Some(UpdateWorktree {
647 project_id: message.project_id,
648 worktree_id: message.worktree_id,
649 root_name: message.root_name.clone(),
650 abs_path: message.abs_path.clone(),
651 updated_entries,
652 removed_entries,
653 scan_id: message.scan_id,
654 is_last_update: done_files && message.is_last_update,
655 updated_repositories,
656 removed_repositories,
657 })
658 })
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664
665 #[gpui::test]
666 async fn test_buffer_size() {
667 let (tx, rx) = futures::channel::mpsc::unbounded();
668 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
669 sink.write(Message::Envelope(Envelope {
670 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
671 root_name: "abcdefg".repeat(10),
672 ..Default::default()
673 })),
674 ..Default::default()
675 }))
676 .await
677 .unwrap();
678 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
679 sink.write(Message::Envelope(Envelope {
680 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
681 root_name: "abcdefg".repeat(1000000),
682 ..Default::default()
683 })),
684 ..Default::default()
685 }))
686 .await
687 .unwrap();
688 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
689
690 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
691 stream.read().await.unwrap();
692 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
693 stream.read().await.unwrap();
694 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
695 }
696
697 #[gpui::test]
698 fn test_converting_peer_id_from_and_to_u64() {
699 let peer_id = PeerId {
700 owner_id: 10,
701 id: 3,
702 };
703 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
704 let peer_id = PeerId {
705 owner_id: u32::MAX,
706 id: 3,
707 };
708 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
709 let peer_id = PeerId {
710 owner_id: 10,
711 id: u32::MAX,
712 };
713 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
714 let peer_id = PeerId {
715 owner_id: u32::MAX,
716 id: u32::MAX,
717 };
718 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
719 }
720}