1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 type Entity;
35 fn remote_entity_id(&self) -> u64;
36}
37
38pub trait RequestMessage: EnvelopedMessage {
39 type Response: EnvelopedMessage;
40}
41
42pub trait AnyTypedEnvelope: 'static + Send + Sync {
43 fn payload_type_id(&self) -> TypeId;
44 fn payload_type_name(&self) -> &'static str;
45 fn as_any(&self) -> &dyn Any;
46 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
47 fn is_background(&self) -> bool;
48 fn original_sender_id(&self) -> Option<PeerId>;
49 fn sender_id(&self) -> ConnectionId;
50 fn message_id(&self) -> u32;
51}
52
53pub enum MessagePriority {
54 Foreground,
55 Background,
56}
57
58impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
59 fn payload_type_id(&self) -> TypeId {
60 TypeId::of::<T>()
61 }
62
63 fn payload_type_name(&self) -> &'static str {
64 T::NAME
65 }
66
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70
71 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
72 self
73 }
74
75 fn is_background(&self) -> bool {
76 matches!(T::PRIORITY, MessagePriority::Background)
77 }
78
79 fn original_sender_id(&self) -> Option<PeerId> {
80 self.original_sender_id
81 }
82
83 fn sender_id(&self) -> ConnectionId {
84 self.sender_id
85 }
86
87 fn message_id(&self) -> u32 {
88 self.message_id
89 }
90}
91
92impl PeerId {
93 pub fn from_u64(peer_id: u64) -> Self {
94 let owner_id = (peer_id >> 32) as u32;
95 let id = peer_id as u32;
96 Self { owner_id, id }
97 }
98
99 pub fn as_u64(self) -> u64 {
100 ((self.owner_id as u64) << 32) | (self.id as u64)
101 }
102}
103
104impl Copy for PeerId {}
105
106impl Eq for PeerId {}
107
108impl Ord for PeerId {
109 fn cmp(&self, other: &Self) -> cmp::Ordering {
110 self.owner_id
111 .cmp(&other.owner_id)
112 .then_with(|| self.id.cmp(&other.id))
113 }
114}
115
116impl PartialOrd for PeerId {
117 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
118 Some(self.cmp(other))
119 }
120}
121
122impl std::hash::Hash for PeerId {
123 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
124 self.owner_id.hash(state);
125 self.id.hash(state);
126 }
127}
128
129impl fmt::Display for PeerId {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 write!(f, "{}/{}", self.owner_id, self.id)
132 }
133}
134
135messages!(
136 (Ack, Foreground),
137 (AckBufferOperation, Background),
138 (AckChannelMessage, Background),
139 (AddNotification, Foreground),
140 (AddProjectCollaborator, Foreground),
141 (ApplyCodeAction, Background),
142 (ApplyCodeActionResponse, Background),
143 (ApplyCompletionAdditionalEdits, Background),
144 (ApplyCompletionAdditionalEditsResponse, Background),
145 (BufferReloaded, Foreground),
146 (BufferSaved, Foreground),
147 (Call, Foreground),
148 (CallCanceled, Foreground),
149 (CancelCall, Foreground),
150 (ChannelMessageSent, Foreground),
151 (CopyProjectEntry, Foreground),
152 (CreateBufferForPeer, Foreground),
153 (CreateChannel, Foreground),
154 (CreateChannelResponse, Foreground),
155 (CreateProjectEntry, Foreground),
156 (CreateRoom, Foreground),
157 (CreateRoomResponse, Foreground),
158 (DeclineCall, Foreground),
159 (DeleteChannel, Foreground),
160 (DeleteNotification, Foreground),
161 (DeleteProjectEntry, Foreground),
162 (Error, Foreground),
163 (ExpandProjectEntry, Foreground),
164 (ExpandProjectEntryResponse, Foreground),
165 (Follow, Foreground),
166 (FollowResponse, Foreground),
167 (FormatBuffers, Foreground),
168 (FormatBuffersResponse, Foreground),
169 (FuzzySearchUsers, Foreground),
170 (GetChannelMembers, Foreground),
171 (GetChannelMembersResponse, Foreground),
172 (GetChannelMessages, Background),
173 (GetChannelMessagesById, Background),
174 (GetChannelMessagesResponse, Background),
175 (GetCodeActions, Background),
176 (GetCodeActionsResponse, Background),
177 (GetCompletions, Background),
178 (GetCompletionsResponse, Background),
179 (GetDefinition, Background),
180 (GetDefinitionResponse, Background),
181 (GetDocumentHighlights, Background),
182 (GetDocumentHighlightsResponse, Background),
183 (GetHover, Background),
184 (GetHoverResponse, Background),
185 (GetNotifications, Foreground),
186 (GetNotificationsResponse, Foreground),
187 (GetPrivateUserInfo, Foreground),
188 (GetPrivateUserInfoResponse, Foreground),
189 (GetProjectSymbols, Background),
190 (GetProjectSymbolsResponse, Background),
191 (GetReferences, Background),
192 (GetReferencesResponse, Background),
193 (GetTypeDefinition, Background),
194 (GetTypeDefinitionResponse, Background),
195 (GetUsers, Foreground),
196 (Hello, Foreground),
197 (IncomingCall, Foreground),
198 (InlayHints, Background),
199 (InlayHintsResponse, Background),
200 (InviteChannelMember, Foreground),
201 (JoinChannel, Foreground),
202 (JoinChannelBuffer, Foreground),
203 (JoinChannelBufferResponse, Foreground),
204 (JoinChannelChat, Foreground),
205 (JoinChannelChatResponse, Foreground),
206 (JoinProject, Foreground),
207 (JoinProjectResponse, Foreground),
208 (JoinRoom, Foreground),
209 (JoinRoomResponse, Foreground),
210 (LeaveChannelBuffer, Background),
211 (LeaveChannelChat, Foreground),
212 (LeaveProject, Foreground),
213 (LeaveRoom, Foreground),
214 (MarkNotificationRead, Foreground),
215 (MoveChannel, Foreground),
216 (OnTypeFormatting, Background),
217 (OnTypeFormattingResponse, Background),
218 (OpenBufferById, Background),
219 (OpenBufferByPath, Background),
220 (OpenBufferForSymbol, Background),
221 (OpenBufferForSymbolResponse, Background),
222 (OpenBufferResponse, Background),
223 (PerformRename, Background),
224 (PerformRenameResponse, Background),
225 (Ping, Foreground),
226 (PrepareRename, Background),
227 (PrepareRenameResponse, Background),
228 (ProjectEntryResponse, Foreground),
229 (RefreshInlayHints, Foreground),
230 (RejoinChannelBuffers, Foreground),
231 (RejoinChannelBuffersResponse, Foreground),
232 (RejoinRoom, Foreground),
233 (RejoinRoomResponse, Foreground),
234 (ReloadBuffers, Foreground),
235 (ReloadBuffersResponse, Foreground),
236 (RemoveChannelMember, Foreground),
237 (RemoveChannelMessage, Foreground),
238 (RemoveContact, Foreground),
239 (RemoveProjectCollaborator, Foreground),
240 (RenameChannel, Foreground),
241 (RenameChannelResponse, Foreground),
242 (RenameProjectEntry, Foreground),
243 (RequestContact, Foreground),
244 (ResolveCompletionDocumentation, Background),
245 (ResolveCompletionDocumentationResponse, Background),
246 (ResolveInlayHint, Background),
247 (ResolveInlayHintResponse, Background),
248 (RespondToChannelInvite, Foreground),
249 (RespondToContactRequest, Foreground),
250 (RoomUpdated, Foreground),
251 (SaveBuffer, Foreground),
252 (SetChannelMemberRole, Foreground),
253 (SetChannelVisibility, Foreground),
254 (SearchProject, Background),
255 (SearchProjectResponse, Background),
256 (SendChannelMessage, Background),
257 (SendChannelMessageResponse, Background),
258 (ShareProject, Foreground),
259 (ShareProjectResponse, Foreground),
260 (ShowContacts, Foreground),
261 (StartLanguageServer, Foreground),
262 (SynchronizeBuffers, Foreground),
263 (SynchronizeBuffersResponse, Foreground),
264 (Test, Foreground),
265 (Unfollow, Foreground),
266 (UnshareProject, Foreground),
267 (UpdateBuffer, Foreground),
268 (UpdateBufferFile, Foreground),
269 (UpdateChannelBuffer, Foreground),
270 (UpdateChannelBufferCollaborators, Foreground),
271 (UpdateChannels, Foreground),
272 (UpdateContacts, Foreground),
273 (UpdateDiagnosticSummary, Foreground),
274 (UpdateDiffBase, Foreground),
275 (UpdateFollowers, Foreground),
276 (UpdateInviteInfo, Foreground),
277 (UpdateLanguageServer, Foreground),
278 (UpdateParticipantLocation, Foreground),
279 (UpdateProject, Foreground),
280 (UpdateProjectCollaborator, Foreground),
281 (UpdateWorktree, Foreground),
282 (UpdateWorktreeSettings, Foreground),
283 (UsersResponse, Foreground),
284 (LspExtExpandMacro, Background),
285 (LspExtExpandMacroResponse, Background),
286 (SetRoomParticipantRole, Foreground),
287);
288
289request_messages!(
290 (ApplyCodeAction, ApplyCodeActionResponse),
291 (
292 ApplyCompletionAdditionalEdits,
293 ApplyCompletionAdditionalEditsResponse
294 ),
295 (Call, Ack),
296 (CancelCall, Ack),
297 (CopyProjectEntry, ProjectEntryResponse),
298 (CreateChannel, CreateChannelResponse),
299 (CreateProjectEntry, ProjectEntryResponse),
300 (CreateRoom, CreateRoomResponse),
301 (DeclineCall, Ack),
302 (DeleteChannel, Ack),
303 (DeleteProjectEntry, ProjectEntryResponse),
304 (ExpandProjectEntry, ExpandProjectEntryResponse),
305 (Follow, FollowResponse),
306 (FormatBuffers, FormatBuffersResponse),
307 (FuzzySearchUsers, UsersResponse),
308 (GetChannelMembers, GetChannelMembersResponse),
309 (GetChannelMessages, GetChannelMessagesResponse),
310 (GetChannelMessagesById, GetChannelMessagesResponse),
311 (GetCodeActions, GetCodeActionsResponse),
312 (GetCompletions, GetCompletionsResponse),
313 (GetDefinition, GetDefinitionResponse),
314 (GetDocumentHighlights, GetDocumentHighlightsResponse),
315 (GetHover, GetHoverResponse),
316 (GetNotifications, GetNotificationsResponse),
317 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
318 (GetProjectSymbols, GetProjectSymbolsResponse),
319 (GetReferences, GetReferencesResponse),
320 (GetTypeDefinition, GetTypeDefinitionResponse),
321 (GetUsers, UsersResponse),
322 (IncomingCall, Ack),
323 (InlayHints, InlayHintsResponse),
324 (InviteChannelMember, Ack),
325 (JoinChannel, JoinRoomResponse),
326 (JoinChannelBuffer, JoinChannelBufferResponse),
327 (JoinChannelChat, JoinChannelChatResponse),
328 (JoinProject, JoinProjectResponse),
329 (JoinRoom, JoinRoomResponse),
330 (LeaveChannelBuffer, Ack),
331 (LeaveRoom, Ack),
332 (MarkNotificationRead, Ack),
333 (MoveChannel, Ack),
334 (OnTypeFormatting, OnTypeFormattingResponse),
335 (OpenBufferById, OpenBufferResponse),
336 (OpenBufferByPath, OpenBufferResponse),
337 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
338 (PerformRename, PerformRenameResponse),
339 (Ping, Ack),
340 (PrepareRename, PrepareRenameResponse),
341 (RefreshInlayHints, Ack),
342 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
343 (RejoinRoom, RejoinRoomResponse),
344 (ReloadBuffers, ReloadBuffersResponse),
345 (RemoveChannelMember, Ack),
346 (RemoveChannelMessage, Ack),
347 (RemoveContact, Ack),
348 (RenameChannel, RenameChannelResponse),
349 (RenameProjectEntry, ProjectEntryResponse),
350 (RequestContact, Ack),
351 (
352 ResolveCompletionDocumentation,
353 ResolveCompletionDocumentationResponse
354 ),
355 (ResolveInlayHint, ResolveInlayHintResponse),
356 (RespondToChannelInvite, Ack),
357 (RespondToContactRequest, Ack),
358 (SaveBuffer, BufferSaved),
359 (SearchProject, SearchProjectResponse),
360 (SendChannelMessage, SendChannelMessageResponse),
361 (SetChannelMemberRole, Ack),
362 (SetChannelVisibility, Ack),
363 (ShareProject, ShareProjectResponse),
364 (SynchronizeBuffers, SynchronizeBuffersResponse),
365 (Test, Test),
366 (UpdateBuffer, Ack),
367 (UpdateParticipantLocation, Ack),
368 (UpdateProject, Ack),
369 (UpdateWorktree, Ack),
370 (LspExtExpandMacro, LspExtExpandMacroResponse),
371 (SetRoomParticipantRole, Ack),
372);
373
374entity_messages!(
375 {project_id, ShareProject},
376 AddProjectCollaborator,
377 ApplyCodeAction,
378 ApplyCompletionAdditionalEdits,
379 BufferReloaded,
380 BufferSaved,
381 CopyProjectEntry,
382 CreateBufferForPeer,
383 CreateProjectEntry,
384 DeleteProjectEntry,
385 ExpandProjectEntry,
386 FormatBuffers,
387 GetCodeActions,
388 GetCompletions,
389 GetDefinition,
390 GetDocumentHighlights,
391 GetHover,
392 GetProjectSymbols,
393 GetReferences,
394 GetTypeDefinition,
395 InlayHints,
396 JoinProject,
397 LeaveProject,
398 OnTypeFormatting,
399 OpenBufferById,
400 OpenBufferByPath,
401 OpenBufferForSymbol,
402 PerformRename,
403 PrepareRename,
404 RefreshInlayHints,
405 ReloadBuffers,
406 RemoveProjectCollaborator,
407 RenameProjectEntry,
408 ResolveCompletionDocumentation,
409 ResolveInlayHint,
410 SaveBuffer,
411 SearchProject,
412 StartLanguageServer,
413 SynchronizeBuffers,
414 UnshareProject,
415 UpdateBuffer,
416 UpdateBufferFile,
417 UpdateDiagnosticSummary,
418 UpdateDiffBase,
419 UpdateLanguageServer,
420 UpdateProject,
421 UpdateProjectCollaborator,
422 UpdateWorktree,
423 UpdateWorktreeSettings,
424 LspExtExpandMacro,
425);
426
427entity_messages!(
428 {channel_id, Channel},
429 ChannelMessageSent,
430 RemoveChannelMessage,
431 UpdateChannelBuffer,
432 UpdateChannelBufferCollaborators,
433);
434
435const KIB: usize = 1024;
436const MIB: usize = KIB * 1024;
437const MAX_BUFFER_LEN: usize = MIB;
438
439/// A stream of protobuf messages.
440pub struct MessageStream<S> {
441 stream: S,
442 encoding_buffer: Vec<u8>,
443}
444
445#[allow(clippy::large_enum_variant)]
446#[derive(Debug)]
447pub enum Message {
448 Envelope(Envelope),
449 Ping,
450 Pong,
451}
452
453impl<S> MessageStream<S> {
454 pub fn new(stream: S) -> Self {
455 Self {
456 stream,
457 encoding_buffer: Vec::new(),
458 }
459 }
460
461 pub fn inner_mut(&mut self) -> &mut S {
462 &mut self.stream
463 }
464}
465
466impl<S> MessageStream<S>
467where
468 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
469{
470 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
471 #[cfg(any(test, feature = "test-support"))]
472 const COMPRESSION_LEVEL: i32 = -7;
473
474 #[cfg(not(any(test, feature = "test-support")))]
475 const COMPRESSION_LEVEL: i32 = 4;
476
477 match message {
478 Message::Envelope(message) => {
479 self.encoding_buffer.reserve(message.encoded_len());
480 message
481 .encode(&mut self.encoding_buffer)
482 .map_err(io::Error::from)?;
483 let buffer =
484 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
485 .unwrap();
486
487 self.encoding_buffer.clear();
488 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
489 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
490 }
491 Message::Ping => {
492 self.stream
493 .send(WebSocketMessage::Ping(Default::default()))
494 .await?;
495 }
496 Message::Pong => {
497 self.stream
498 .send(WebSocketMessage::Pong(Default::default()))
499 .await?;
500 }
501 }
502
503 Ok(())
504 }
505}
506
507impl<S> MessageStream<S>
508where
509 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
510{
511 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
512 while let Some(bytes) = self.stream.next().await {
513 match bytes? {
514 WebSocketMessage::Binary(bytes) => {
515 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
516 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
517 .map_err(io::Error::from)?;
518
519 self.encoding_buffer.clear();
520 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
521 return Ok(Message::Envelope(envelope));
522 }
523 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
524 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
525 WebSocketMessage::Close(_) => break,
526 _ => {}
527 }
528 }
529 Err(anyhow!("connection closed"))
530 }
531}
532
533impl From<Timestamp> for SystemTime {
534 fn from(val: Timestamp) -> Self {
535 UNIX_EPOCH
536 .checked_add(Duration::new(val.seconds, val.nanos))
537 .unwrap()
538 }
539}
540
541impl From<SystemTime> for Timestamp {
542 fn from(time: SystemTime) -> Self {
543 let duration = time.duration_since(UNIX_EPOCH).unwrap();
544 Self {
545 seconds: duration.as_secs(),
546 nanos: duration.subsec_nanos(),
547 }
548 }
549}
550
551impl From<u128> for Nonce {
552 fn from(nonce: u128) -> Self {
553 let upper_half = (nonce >> 64) as u64;
554 let lower_half = nonce as u64;
555 Self {
556 upper_half,
557 lower_half,
558 }
559 }
560}
561
562impl From<Nonce> for u128 {
563 fn from(nonce: Nonce) -> Self {
564 let upper_half = (nonce.upper_half as u128) << 64;
565 let lower_half = nonce.lower_half as u128;
566 upper_half | lower_half
567 }
568}
569
570pub fn split_worktree_update(
571 mut message: UpdateWorktree,
572 max_chunk_size: usize,
573) -> impl Iterator<Item = UpdateWorktree> {
574 let mut done_files = false;
575
576 let mut repository_map = message
577 .updated_repositories
578 .into_iter()
579 .map(|repo| (repo.work_directory_id, repo))
580 .collect::<HashMap<_, _>>();
581
582 iter::from_fn(move || {
583 if done_files {
584 return None;
585 }
586
587 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
588 let updated_entries: Vec<_> = message
589 .updated_entries
590 .drain(..updated_entries_chunk_size)
591 .collect();
592
593 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
594 let removed_entries = message
595 .removed_entries
596 .drain(..removed_entries_chunk_size)
597 .collect();
598
599 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
600
601 let mut updated_repositories = Vec::new();
602
603 if !repository_map.is_empty() {
604 for entry in &updated_entries {
605 if let Some(repo) = repository_map.remove(&entry.id) {
606 updated_repositories.push(repo)
607 }
608 }
609 }
610
611 let removed_repositories = if done_files {
612 mem::take(&mut message.removed_repositories)
613 } else {
614 Default::default()
615 };
616
617 if done_files {
618 updated_repositories.extend(mem::take(&mut repository_map).into_values());
619 }
620
621 Some(UpdateWorktree {
622 project_id: message.project_id,
623 worktree_id: message.worktree_id,
624 root_name: message.root_name.clone(),
625 abs_path: message.abs_path.clone(),
626 updated_entries,
627 removed_entries,
628 scan_id: message.scan_id,
629 is_last_update: done_files && message.is_last_update,
630 updated_repositories,
631 removed_repositories,
632 })
633 })
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639
640 #[gpui::test]
641 async fn test_buffer_size() {
642 let (tx, rx) = futures::channel::mpsc::unbounded();
643 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
644 sink.write(Message::Envelope(Envelope {
645 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
646 root_name: "abcdefg".repeat(10),
647 ..Default::default()
648 })),
649 ..Default::default()
650 }))
651 .await
652 .unwrap();
653 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
654 sink.write(Message::Envelope(Envelope {
655 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
656 root_name: "abcdefg".repeat(1000000),
657 ..Default::default()
658 })),
659 ..Default::default()
660 }))
661 .await
662 .unwrap();
663 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
664
665 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
666 stream.read().await.unwrap();
667 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
668 stream.read().await.unwrap();
669 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
670 }
671
672 #[gpui::test]
673 fn test_converting_peer_id_from_and_to_u64() {
674 let peer_id = PeerId {
675 owner_id: 10,
676 id: 3,
677 };
678 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
679 let peer_id = PeerId {
680 owner_id: u32::MAX,
681 id: 3,
682 };
683 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
684 let peer_id = PeerId {
685 owner_id: 10,
686 id: u32::MAX,
687 };
688 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
689 let peer_id = PeerId {
690 owner_id: u32::MAX,
691 id: u32::MAX,
692 };
693 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
694 }
695}