1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::time::Instant;
12use std::{
13 cmp,
14 fmt::Debug,
15 io, iter,
16 time::{Duration, SystemTime, UNIX_EPOCH},
17};
18use std::{fmt, mem};
19
20include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
21
22pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
23 const NAME: &'static str;
24 const PRIORITY: MessagePriority;
25 fn into_envelope(
26 self,
27 id: u32,
28 responding_to: Option<u32>,
29 original_sender_id: Option<PeerId>,
30 ) -> Envelope;
31 fn from_envelope(envelope: Envelope) -> Option<Self>;
32}
33
34pub trait EntityMessage: EnvelopedMessage {
35 type Entity;
36 fn remote_entity_id(&self) -> u64;
37}
38
39pub trait RequestMessage: EnvelopedMessage {
40 type Response: EnvelopedMessage;
41}
42
43pub trait AnyTypedEnvelope: 'static + Send + Sync {
44 fn payload_type_id(&self) -> TypeId;
45 fn payload_type_name(&self) -> &'static str;
46 fn as_any(&self) -> &dyn Any;
47 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
48 fn is_background(&self) -> bool;
49 fn original_sender_id(&self) -> Option<PeerId>;
50 fn sender_id(&self) -> ConnectionId;
51 fn message_id(&self) -> u32;
52}
53
54pub enum MessagePriority {
55 Foreground,
56 Background,
57}
58
59impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
60 fn payload_type_id(&self) -> TypeId {
61 TypeId::of::<T>()
62 }
63
64 fn payload_type_name(&self) -> &'static str {
65 T::NAME
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
73 self
74 }
75
76 fn is_background(&self) -> bool {
77 matches!(T::PRIORITY, MessagePriority::Background)
78 }
79
80 fn original_sender_id(&self) -> Option<PeerId> {
81 self.original_sender_id
82 }
83
84 fn sender_id(&self) -> ConnectionId {
85 self.sender_id
86 }
87
88 fn message_id(&self) -> u32 {
89 self.message_id
90 }
91}
92
93impl PeerId {
94 pub fn from_u64(peer_id: u64) -> Self {
95 let owner_id = (peer_id >> 32) as u32;
96 let id = peer_id as u32;
97 Self { owner_id, id }
98 }
99
100 pub fn as_u64(self) -> u64 {
101 ((self.owner_id as u64) << 32) | (self.id as u64)
102 }
103}
104
105impl Copy for PeerId {}
106
107impl Eq for PeerId {}
108
109impl Ord for PeerId {
110 fn cmp(&self, other: &Self) -> cmp::Ordering {
111 self.owner_id
112 .cmp(&other.owner_id)
113 .then_with(|| self.id.cmp(&other.id))
114 }
115}
116
117impl PartialOrd for PeerId {
118 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
119 Some(self.cmp(other))
120 }
121}
122
123impl std::hash::Hash for PeerId {
124 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
125 self.owner_id.hash(state);
126 self.id.hash(state);
127 }
128}
129
130impl fmt::Display for PeerId {
131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132 write!(f, "{}/{}", self.owner_id, self.id)
133 }
134}
135
136messages!(
137 (Ack, Foreground),
138 (AckBufferOperation, Background),
139 (AckChannelMessage, Background),
140 (AddNotification, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Background),
143 (ApplyCodeActionResponse, Background),
144 (ApplyCompletionAdditionalEdits, Background),
145 (ApplyCompletionAdditionalEditsResponse, Background),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (Call, Foreground),
149 (CallCanceled, Foreground),
150 (CancelCall, Foreground),
151 (ChannelMessageSent, Foreground),
152 (ChannelMessageUpdate, Foreground),
153 (CompleteWithLanguageModel, Background),
154 (CopyProjectEntry, Foreground),
155 (CountTokensWithLanguageModel, Background),
156 (CountTokensResponse, Background),
157 (CreateBufferForPeer, Foreground),
158 (CreateChannel, Foreground),
159 (CreateChannelResponse, Foreground),
160 (CreateProjectEntry, Foreground),
161 (CreateRoom, Foreground),
162 (CreateRoomResponse, Foreground),
163 (DeclineCall, Foreground),
164 (DeleteChannel, Foreground),
165 (DeleteNotification, Foreground),
166 (DeleteProjectEntry, Foreground),
167 (EndStream, Foreground),
168 (Error, Foreground),
169 (ExpandProjectEntry, Foreground),
170 (ExpandProjectEntryResponse, Foreground),
171 (Follow, Foreground),
172 (FollowResponse, Foreground),
173 (FormatBuffers, Foreground),
174 (FormatBuffersResponse, Foreground),
175 (FuzzySearchUsers, Foreground),
176 (GetChannelMembers, Foreground),
177 (GetChannelMembersResponse, Foreground),
178 (GetChannelMessages, Background),
179 (GetChannelMessagesById, Background),
180 (GetChannelMessagesResponse, Background),
181 (GetCodeActions, Background),
182 (GetCodeActionsResponse, Background),
183 (GetCompletions, Background),
184 (GetCompletionsResponse, Background),
185 (GetDefinition, Background),
186 (GetDefinitionResponse, Background),
187 (GetDocumentHighlights, Background),
188 (GetDocumentHighlightsResponse, Background),
189 (GetHover, Background),
190 (GetHoverResponse, Background),
191 (GetNotifications, Foreground),
192 (GetNotificationsResponse, Foreground),
193 (GetPrivateUserInfo, Foreground),
194 (GetPrivateUserInfoResponse, Foreground),
195 (GetProjectSymbols, Background),
196 (GetProjectSymbolsResponse, Background),
197 (GetReferences, Background),
198 (GetReferencesResponse, Background),
199 (GetTypeDefinition, Background),
200 (GetTypeDefinitionResponse, Background),
201 (GetImplementation, Background),
202 (GetImplementationResponse, Background),
203 (GetUsers, Foreground),
204 (Hello, Foreground),
205 (IncomingCall, Foreground),
206 (InlayHints, Background),
207 (InlayHintsResponse, Background),
208 (InviteChannelMember, Foreground),
209 (JoinChannel, Foreground),
210 (JoinChannelBuffer, Foreground),
211 (JoinChannelBufferResponse, Foreground),
212 (JoinChannelChat, Foreground),
213 (JoinChannelChatResponse, Foreground),
214 (JoinProject, Foreground),
215 (JoinHostedProject, Foreground),
216 (JoinProjectResponse, Foreground),
217 (JoinRoom, Foreground),
218 (JoinRoomResponse, Foreground),
219 (LanguageModelResponse, Background),
220 (LeaveChannelBuffer, Background),
221 (LeaveChannelChat, Foreground),
222 (LeaveProject, Foreground),
223 (LeaveRoom, Foreground),
224 (MarkNotificationRead, Foreground),
225 (MoveChannel, Foreground),
226 (OnTypeFormatting, Background),
227 (OnTypeFormattingResponse, Background),
228 (OpenBufferById, Background),
229 (OpenBufferByPath, Background),
230 (OpenBufferForSymbol, Background),
231 (OpenBufferForSymbolResponse, Background),
232 (OpenBufferResponse, Background),
233 (PerformRename, Background),
234 (PerformRenameResponse, Background),
235 (Ping, Foreground),
236 (PrepareRename, Background),
237 (PrepareRenameResponse, Background),
238 (ProjectEntryResponse, Foreground),
239 (RefreshInlayHints, Foreground),
240 (RejoinChannelBuffers, Foreground),
241 (RejoinChannelBuffersResponse, Foreground),
242 (RejoinRoom, Foreground),
243 (RejoinRoomResponse, Foreground),
244 (ReloadBuffers, Foreground),
245 (ReloadBuffersResponse, Foreground),
246 (RemoveChannelMember, Foreground),
247 (RemoveChannelMessage, Foreground),
248 (UpdateChannelMessage, Foreground),
249 (RemoveContact, Foreground),
250 (RemoveProjectCollaborator, Foreground),
251 (RenameChannel, Foreground),
252 (RenameChannelResponse, Foreground),
253 (RenameProjectEntry, Foreground),
254 (RequestContact, Foreground),
255 (ResolveCompletionDocumentation, Background),
256 (ResolveCompletionDocumentationResponse, Background),
257 (ResolveInlayHint, Background),
258 (ResolveInlayHintResponse, Background),
259 (RespondToChannelInvite, Foreground),
260 (RespondToContactRequest, Foreground),
261 (RoomUpdated, Foreground),
262 (SaveBuffer, Foreground),
263 (SetChannelMemberRole, Foreground),
264 (SetChannelVisibility, Foreground),
265 (SearchProject, Background),
266 (SearchProjectResponse, Background),
267 (SendChannelMessage, Background),
268 (SendChannelMessageResponse, Background),
269 (ShareProject, Foreground),
270 (ShareProjectResponse, Foreground),
271 (ShowContacts, Foreground),
272 (StartLanguageServer, Foreground),
273 (SynchronizeBuffers, Foreground),
274 (SynchronizeBuffersResponse, Foreground),
275 (Test, Foreground),
276 (Unfollow, Foreground),
277 (UnshareProject, Foreground),
278 (UpdateBuffer, Foreground),
279 (UpdateBufferFile, Foreground),
280 (UpdateChannelBuffer, Foreground),
281 (UpdateChannelBufferCollaborators, Foreground),
282 (UpdateChannels, Foreground),
283 (UpdateUserChannels, Foreground),
284 (UpdateContacts, Foreground),
285 (UpdateDiagnosticSummary, Foreground),
286 (UpdateDiffBase, Foreground),
287 (UpdateFollowers, Foreground),
288 (UpdateInviteInfo, Foreground),
289 (UpdateLanguageServer, Foreground),
290 (UpdateParticipantLocation, Foreground),
291 (UpdateProject, Foreground),
292 (UpdateProjectCollaborator, Foreground),
293 (UpdateWorktree, Foreground),
294 (UpdateWorktreeSettings, Foreground),
295 (UsersResponse, Foreground),
296 (LspExtExpandMacro, Background),
297 (LspExtExpandMacroResponse, Background),
298 (SetRoomParticipantRole, Foreground),
299);
300
301request_messages!(
302 (ApplyCodeAction, ApplyCodeActionResponse),
303 (
304 ApplyCompletionAdditionalEdits,
305 ApplyCompletionAdditionalEditsResponse
306 ),
307 (Call, Ack),
308 (CancelCall, Ack),
309 (CopyProjectEntry, ProjectEntryResponse),
310 (CompleteWithLanguageModel, LanguageModelResponse),
311 (CountTokensWithLanguageModel, CountTokensResponse),
312 (CreateChannel, CreateChannelResponse),
313 (CreateProjectEntry, ProjectEntryResponse),
314 (CreateRoom, CreateRoomResponse),
315 (DeclineCall, Ack),
316 (DeleteChannel, Ack),
317 (DeleteProjectEntry, ProjectEntryResponse),
318 (ExpandProjectEntry, ExpandProjectEntryResponse),
319 (Follow, FollowResponse),
320 (FormatBuffers, FormatBuffersResponse),
321 (FuzzySearchUsers, UsersResponse),
322 (GetChannelMembers, GetChannelMembersResponse),
323 (GetChannelMessages, GetChannelMessagesResponse),
324 (GetChannelMessagesById, GetChannelMessagesResponse),
325 (GetCodeActions, GetCodeActionsResponse),
326 (GetCompletions, GetCompletionsResponse),
327 (GetDefinition, GetDefinitionResponse),
328 (GetImplementation, GetImplementationResponse),
329 (GetDocumentHighlights, GetDocumentHighlightsResponse),
330 (GetHover, GetHoverResponse),
331 (GetNotifications, GetNotificationsResponse),
332 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
333 (GetProjectSymbols, GetProjectSymbolsResponse),
334 (GetReferences, GetReferencesResponse),
335 (GetTypeDefinition, GetTypeDefinitionResponse),
336 (GetUsers, UsersResponse),
337 (IncomingCall, Ack),
338 (InlayHints, InlayHintsResponse),
339 (InviteChannelMember, Ack),
340 (JoinChannel, JoinRoomResponse),
341 (JoinChannelBuffer, JoinChannelBufferResponse),
342 (JoinChannelChat, JoinChannelChatResponse),
343 (JoinHostedProject, JoinProjectResponse),
344 (JoinProject, JoinProjectResponse),
345 (JoinRoom, JoinRoomResponse),
346 (LeaveChannelBuffer, Ack),
347 (LeaveRoom, Ack),
348 (MarkNotificationRead, Ack),
349 (MoveChannel, Ack),
350 (OnTypeFormatting, OnTypeFormattingResponse),
351 (OpenBufferById, OpenBufferResponse),
352 (OpenBufferByPath, OpenBufferResponse),
353 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
354 (PerformRename, PerformRenameResponse),
355 (Ping, Ack),
356 (PrepareRename, PrepareRenameResponse),
357 (RefreshInlayHints, Ack),
358 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
359 (RejoinRoom, RejoinRoomResponse),
360 (ReloadBuffers, ReloadBuffersResponse),
361 (RemoveChannelMember, Ack),
362 (RemoveChannelMessage, Ack),
363 (UpdateChannelMessage, Ack),
364 (RemoveContact, Ack),
365 (RenameChannel, RenameChannelResponse),
366 (RenameProjectEntry, ProjectEntryResponse),
367 (RequestContact, Ack),
368 (
369 ResolveCompletionDocumentation,
370 ResolveCompletionDocumentationResponse
371 ),
372 (ResolveInlayHint, ResolveInlayHintResponse),
373 (RespondToChannelInvite, Ack),
374 (RespondToContactRequest, Ack),
375 (SaveBuffer, BufferSaved),
376 (SearchProject, SearchProjectResponse),
377 (SendChannelMessage, SendChannelMessageResponse),
378 (SetChannelMemberRole, Ack),
379 (SetChannelVisibility, Ack),
380 (ShareProject, ShareProjectResponse),
381 (SynchronizeBuffers, SynchronizeBuffersResponse),
382 (Test, Test),
383 (UpdateBuffer, Ack),
384 (UpdateParticipantLocation, Ack),
385 (UpdateProject, Ack),
386 (UpdateWorktree, Ack),
387 (LspExtExpandMacro, LspExtExpandMacroResponse),
388 (SetRoomParticipantRole, Ack),
389);
390
391entity_messages!(
392 {project_id, ShareProject},
393 AddProjectCollaborator,
394 ApplyCodeAction,
395 ApplyCompletionAdditionalEdits,
396 BufferReloaded,
397 BufferSaved,
398 CopyProjectEntry,
399 CreateBufferForPeer,
400 CreateProjectEntry,
401 DeleteProjectEntry,
402 ExpandProjectEntry,
403 FormatBuffers,
404 GetCodeActions,
405 GetCompletions,
406 GetDefinition,
407 GetImplementation,
408 GetDocumentHighlights,
409 GetHover,
410 GetProjectSymbols,
411 GetReferences,
412 GetTypeDefinition,
413 InlayHints,
414 JoinProject,
415 LeaveProject,
416 OnTypeFormatting,
417 OpenBufferById,
418 OpenBufferByPath,
419 OpenBufferForSymbol,
420 PerformRename,
421 PrepareRename,
422 RefreshInlayHints,
423 ReloadBuffers,
424 RemoveProjectCollaborator,
425 RenameProjectEntry,
426 ResolveCompletionDocumentation,
427 ResolveInlayHint,
428 SaveBuffer,
429 SearchProject,
430 StartLanguageServer,
431 SynchronizeBuffers,
432 UnshareProject,
433 UpdateBuffer,
434 UpdateBufferFile,
435 UpdateDiagnosticSummary,
436 UpdateDiffBase,
437 UpdateLanguageServer,
438 UpdateProject,
439 UpdateProjectCollaborator,
440 UpdateWorktree,
441 UpdateWorktreeSettings,
442 LspExtExpandMacro,
443);
444
445entity_messages!(
446 {channel_id, Channel},
447 ChannelMessageSent,
448 ChannelMessageUpdate,
449 RemoveChannelMessage,
450 UpdateChannelMessage,
451 UpdateChannelBuffer,
452 UpdateChannelBufferCollaborators,
453);
454
455const KIB: usize = 1024;
456const MIB: usize = KIB * 1024;
457const MAX_BUFFER_LEN: usize = MIB;
458
459/// A stream of protobuf messages.
460pub struct MessageStream<S> {
461 stream: S,
462 encoding_buffer: Vec<u8>,
463}
464
465#[allow(clippy::large_enum_variant)]
466#[derive(Debug)]
467pub enum Message {
468 Envelope(Envelope),
469 Ping,
470 Pong,
471}
472
473impl<S> MessageStream<S> {
474 pub fn new(stream: S) -> Self {
475 Self {
476 stream,
477 encoding_buffer: Vec::new(),
478 }
479 }
480
481 pub fn inner_mut(&mut self) -> &mut S {
482 &mut self.stream
483 }
484}
485
486impl<S> MessageStream<S>
487where
488 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
489{
490 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
491 #[cfg(any(test, feature = "test-support"))]
492 const COMPRESSION_LEVEL: i32 = -7;
493
494 #[cfg(not(any(test, feature = "test-support")))]
495 const COMPRESSION_LEVEL: i32 = 4;
496
497 match message {
498 Message::Envelope(message) => {
499 self.encoding_buffer.reserve(message.encoded_len());
500 message
501 .encode(&mut self.encoding_buffer)
502 .map_err(io::Error::from)?;
503 let buffer =
504 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
505 .unwrap();
506
507 self.encoding_buffer.clear();
508 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
509 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
510 }
511 Message::Ping => {
512 self.stream
513 .send(WebSocketMessage::Ping(Default::default()))
514 .await?;
515 }
516 Message::Pong => {
517 self.stream
518 .send(WebSocketMessage::Pong(Default::default()))
519 .await?;
520 }
521 }
522
523 Ok(())
524 }
525}
526
527impl<S> MessageStream<S>
528where
529 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
530{
531 pub async fn read(&mut self) -> Result<(Message, Instant), anyhow::Error> {
532 while let Some(bytes) = self.stream.next().await {
533 let received_at = Instant::now();
534 match bytes? {
535 WebSocketMessage::Binary(bytes) => {
536 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
537 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
538 .map_err(io::Error::from)?;
539
540 self.encoding_buffer.clear();
541 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
542 return Ok((Message::Envelope(envelope), received_at));
543 }
544 WebSocketMessage::Ping(_) => return Ok((Message::Ping, received_at)),
545 WebSocketMessage::Pong(_) => return Ok((Message::Pong, received_at)),
546 WebSocketMessage::Close(_) => break,
547 _ => {}
548 }
549 }
550 Err(anyhow!("connection closed"))
551 }
552}
553
554impl From<Timestamp> for SystemTime {
555 fn from(val: Timestamp) -> Self {
556 UNIX_EPOCH
557 .checked_add(Duration::new(val.seconds, val.nanos))
558 .unwrap()
559 }
560}
561
562impl From<SystemTime> for Timestamp {
563 fn from(time: SystemTime) -> Self {
564 let duration = time.duration_since(UNIX_EPOCH).unwrap();
565 Self {
566 seconds: duration.as_secs(),
567 nanos: duration.subsec_nanos(),
568 }
569 }
570}
571
572impl From<u128> for Nonce {
573 fn from(nonce: u128) -> Self {
574 let upper_half = (nonce >> 64) as u64;
575 let lower_half = nonce as u64;
576 Self {
577 upper_half,
578 lower_half,
579 }
580 }
581}
582
583impl From<Nonce> for u128 {
584 fn from(nonce: Nonce) -> Self {
585 let upper_half = (nonce.upper_half as u128) << 64;
586 let lower_half = nonce.lower_half as u128;
587 upper_half | lower_half
588 }
589}
590
591pub fn split_worktree_update(
592 mut message: UpdateWorktree,
593 max_chunk_size: usize,
594) -> impl Iterator<Item = UpdateWorktree> {
595 let mut done_files = false;
596
597 let mut repository_map = message
598 .updated_repositories
599 .into_iter()
600 .map(|repo| (repo.work_directory_id, repo))
601 .collect::<HashMap<_, _>>();
602
603 iter::from_fn(move || {
604 if done_files {
605 return None;
606 }
607
608 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
609 let updated_entries: Vec<_> = message
610 .updated_entries
611 .drain(..updated_entries_chunk_size)
612 .collect();
613
614 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
615 let removed_entries = message
616 .removed_entries
617 .drain(..removed_entries_chunk_size)
618 .collect();
619
620 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
621
622 let mut updated_repositories = Vec::new();
623
624 if !repository_map.is_empty() {
625 for entry in &updated_entries {
626 if let Some(repo) = repository_map.remove(&entry.id) {
627 updated_repositories.push(repo)
628 }
629 }
630 }
631
632 let removed_repositories = if done_files {
633 mem::take(&mut message.removed_repositories)
634 } else {
635 Default::default()
636 };
637
638 if done_files {
639 updated_repositories.extend(mem::take(&mut repository_map).into_values());
640 }
641
642 Some(UpdateWorktree {
643 project_id: message.project_id,
644 worktree_id: message.worktree_id,
645 root_name: message.root_name.clone(),
646 abs_path: message.abs_path.clone(),
647 updated_entries,
648 removed_entries,
649 scan_id: message.scan_id,
650 is_last_update: done_files && message.is_last_update,
651 updated_repositories,
652 removed_repositories,
653 })
654 })
655}
656
657#[cfg(test)]
658mod tests {
659 use super::*;
660
661 #[gpui::test]
662 async fn test_buffer_size() {
663 let (tx, rx) = futures::channel::mpsc::unbounded();
664 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
665 sink.write(Message::Envelope(Envelope {
666 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
667 root_name: "abcdefg".repeat(10),
668 ..Default::default()
669 })),
670 ..Default::default()
671 }))
672 .await
673 .unwrap();
674 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
675 sink.write(Message::Envelope(Envelope {
676 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
677 root_name: "abcdefg".repeat(1000000),
678 ..Default::default()
679 })),
680 ..Default::default()
681 }))
682 .await
683 .unwrap();
684 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
685
686 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
687 stream.read().await.unwrap();
688 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
689 stream.read().await.unwrap();
690 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
691 }
692
693 #[gpui::test]
694 fn test_converting_peer_id_from_and_to_u64() {
695 let peer_id = PeerId {
696 owner_id: 10,
697 id: 3,
698 };
699 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
700 let peer_id = PeerId {
701 owner_id: u32::MAX,
702 id: 3,
703 };
704 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
705 let peer_id = PeerId {
706 owner_id: 10,
707 id: u32::MAX,
708 };
709 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
710 let peer_id = PeerId {
711 owner_id: u32::MAX,
712 id: u32::MAX,
713 };
714 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
715 }
716}