1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AckBufferOperation, Background),
137 (AckChannelMessage, Background),
138 (AddNotification, Foreground),
139 (AddProjectCollaborator, Foreground),
140 (ApplyCodeAction, Background),
141 (ApplyCodeActionResponse, Background),
142 (ApplyCompletionAdditionalEdits, Background),
143 (ApplyCompletionAdditionalEditsResponse, Background),
144 (BufferReloaded, Foreground),
145 (BufferSaved, Foreground),
146 (Call, Foreground),
147 (CallCanceled, Foreground),
148 (CancelCall, Foreground),
149 (ChannelMessageSent, Foreground),
150 (CopyProjectEntry, Foreground),
151 (CreateBufferForPeer, Foreground),
152 (CreateChannel, Foreground),
153 (CreateChannelResponse, Foreground),
154 (CreateProjectEntry, Foreground),
155 (CreateRoom, Foreground),
156 (CreateRoomResponse, Foreground),
157 (DeclineCall, Foreground),
158 (DeleteChannel, Foreground),
159 (DeleteNotification, Foreground),
160 (DeleteProjectEntry, Foreground),
161 (Error, Foreground),
162 (ExpandProjectEntry, Foreground),
163 (ExpandProjectEntryResponse, Foreground),
164 (Follow, Foreground),
165 (FollowResponse, Foreground),
166 (FormatBuffers, Foreground),
167 (FormatBuffersResponse, Foreground),
168 (FuzzySearchUsers, Foreground),
169 (GetChannelMembers, Foreground),
170 (GetChannelMembersResponse, Foreground),
171 (GetChannelMessages, Background),
172 (GetChannelMessagesById, Background),
173 (GetChannelMessagesResponse, Background),
174 (GetCodeActions, Background),
175 (GetCodeActionsResponse, Background),
176 (GetCompletions, Background),
177 (GetCompletionsResponse, Background),
178 (GetDefinition, Background),
179 (GetDefinitionResponse, Background),
180 (GetDocumentHighlights, Background),
181 (GetDocumentHighlightsResponse, Background),
182 (GetHover, Background),
183 (GetHoverResponse, Background),
184 (GetNotifications, Foreground),
185 (GetNotificationsResponse, Foreground),
186 (GetPrivateUserInfo, Foreground),
187 (GetPrivateUserInfoResponse, Foreground),
188 (GetProjectSymbols, Background),
189 (GetProjectSymbolsResponse, Background),
190 (GetReferences, Background),
191 (GetReferencesResponse, Background),
192 (GetTypeDefinition, Background),
193 (GetTypeDefinitionResponse, Background),
194 (GetUsers, Foreground),
195 (Hello, Foreground),
196 (IncomingCall, Foreground),
197 (InlayHints, Background),
198 (InlayHintsResponse, Background),
199 (InviteChannelMember, Foreground),
200 (JoinChannel, Foreground),
201 (JoinChannelBuffer, Foreground),
202 (JoinChannelBufferResponse, Foreground),
203 (JoinChannelChat, Foreground),
204 (JoinChannelChatResponse, Foreground),
205 (JoinProject, Foreground),
206 (JoinProjectResponse, Foreground),
207 (JoinRoom, Foreground),
208 (JoinRoomResponse, Foreground),
209 (LeaveChannelBuffer, Background),
210 (LeaveChannelChat, Foreground),
211 (LeaveProject, Foreground),
212 (LeaveRoom, Foreground),
213 (MarkNotificationRead, Foreground),
214 (MoveChannel, Foreground),
215 (OnTypeFormatting, Background),
216 (OnTypeFormattingResponse, Background),
217 (OpenBufferById, Background),
218 (OpenBufferByPath, Background),
219 (OpenBufferForSymbol, Background),
220 (OpenBufferForSymbolResponse, Background),
221 (OpenBufferResponse, Background),
222 (PerformRename, Background),
223 (PerformRenameResponse, Background),
224 (Ping, Foreground),
225 (PrepareRename, Background),
226 (PrepareRenameResponse, Background),
227 (ProjectEntryResponse, Foreground),
228 (RefreshInlayHints, Foreground),
229 (RejoinChannelBuffers, Foreground),
230 (RejoinChannelBuffersResponse, Foreground),
231 (RejoinRoom, Foreground),
232 (RejoinRoomResponse, Foreground),
233 (ReloadBuffers, Foreground),
234 (ReloadBuffersResponse, Foreground),
235 (RemoveChannelMember, Foreground),
236 (RemoveChannelMessage, Foreground),
237 (RemoveContact, Foreground),
238 (RemoveProjectCollaborator, Foreground),
239 (RenameChannel, Foreground),
240 (RenameChannelResponse, Foreground),
241 (RenameProjectEntry, Foreground),
242 (RequestContact, Foreground),
243 (ResolveCompletionDocumentation, Background),
244 (ResolveCompletionDocumentationResponse, Background),
245 (ResolveInlayHint, Background),
246 (ResolveInlayHintResponse, Background),
247 (RespondToChannelInvite, Foreground),
248 (RespondToContactRequest, Foreground),
249 (RoomUpdated, Foreground),
250 (SaveBuffer, Foreground),
251 (SetChannelMemberRole, Foreground),
252 (SetChannelVisibility, Foreground),
253 (SearchProject, Background),
254 (SearchProjectResponse, Background),
255 (SendChannelMessage, Background),
256 (SendChannelMessageResponse, Background),
257 (ShareProject, Foreground),
258 (ShareProjectResponse, Foreground),
259 (ShowContacts, Foreground),
260 (StartLanguageServer, Foreground),
261 (SynchronizeBuffers, Foreground),
262 (SynchronizeBuffersResponse, Foreground),
263 (Test, Foreground),
264 (Unfollow, Foreground),
265 (UnshareProject, Foreground),
266 (UpdateBuffer, Foreground),
267 (UpdateBufferFile, Foreground),
268 (UpdateChannelBuffer, Foreground),
269 (UpdateChannelBufferCollaborators, Foreground),
270 (UpdateChannels, Foreground),
271 (UpdateContacts, Foreground),
272 (UpdateDiagnosticSummary, Foreground),
273 (UpdateDiffBase, Foreground),
274 (UpdateFollowers, Foreground),
275 (UpdateInviteInfo, Foreground),
276 (UpdateLanguageServer, Foreground),
277 (UpdateParticipantLocation, Foreground),
278 (UpdateProject, Foreground),
279 (UpdateProjectCollaborator, Foreground),
280 (UpdateWorktree, Foreground),
281 (UpdateWorktreeSettings, Foreground),
282 (UsersResponse, Foreground),
283 (LspExtExpandMacro, Background),
284 (LspExtExpandMacroResponse, Background),
285);
286
287request_messages!(
288 (ApplyCodeAction, ApplyCodeActionResponse),
289 (
290 ApplyCompletionAdditionalEdits,
291 ApplyCompletionAdditionalEditsResponse
292 ),
293 (Call, Ack),
294 (CancelCall, Ack),
295 (CopyProjectEntry, ProjectEntryResponse),
296 (CreateChannel, CreateChannelResponse),
297 (CreateProjectEntry, ProjectEntryResponse),
298 (CreateRoom, CreateRoomResponse),
299 (DeclineCall, Ack),
300 (DeleteChannel, Ack),
301 (DeleteProjectEntry, ProjectEntryResponse),
302 (ExpandProjectEntry, ExpandProjectEntryResponse),
303 (Follow, FollowResponse),
304 (FormatBuffers, FormatBuffersResponse),
305 (FuzzySearchUsers, UsersResponse),
306 (GetChannelMembers, GetChannelMembersResponse),
307 (GetChannelMessages, GetChannelMessagesResponse),
308 (GetChannelMessagesById, GetChannelMessagesResponse),
309 (GetCodeActions, GetCodeActionsResponse),
310 (GetCompletions, GetCompletionsResponse),
311 (GetDefinition, GetDefinitionResponse),
312 (GetDocumentHighlights, GetDocumentHighlightsResponse),
313 (GetHover, GetHoverResponse),
314 (GetNotifications, GetNotificationsResponse),
315 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
316 (GetProjectSymbols, GetProjectSymbolsResponse),
317 (GetReferences, GetReferencesResponse),
318 (GetTypeDefinition, GetTypeDefinitionResponse),
319 (GetUsers, UsersResponse),
320 (IncomingCall, Ack),
321 (InlayHints, InlayHintsResponse),
322 (InviteChannelMember, Ack),
323 (JoinChannel, JoinRoomResponse),
324 (JoinChannelBuffer, JoinChannelBufferResponse),
325 (JoinChannelChat, JoinChannelChatResponse),
326 (JoinProject, JoinProjectResponse),
327 (JoinRoom, JoinRoomResponse),
328 (LeaveChannelBuffer, Ack),
329 (LeaveRoom, Ack),
330 (MarkNotificationRead, Ack),
331 (MoveChannel, Ack),
332 (OnTypeFormatting, OnTypeFormattingResponse),
333 (OpenBufferById, OpenBufferResponse),
334 (OpenBufferByPath, OpenBufferResponse),
335 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
336 (PerformRename, PerformRenameResponse),
337 (Ping, Ack),
338 (PrepareRename, PrepareRenameResponse),
339 (RefreshInlayHints, Ack),
340 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
341 (RejoinRoom, RejoinRoomResponse),
342 (ReloadBuffers, ReloadBuffersResponse),
343 (RemoveChannelMember, Ack),
344 (RemoveChannelMessage, Ack),
345 (RemoveContact, Ack),
346 (RenameChannel, RenameChannelResponse),
347 (RenameProjectEntry, ProjectEntryResponse),
348 (RequestContact, Ack),
349 (
350 ResolveCompletionDocumentation,
351 ResolveCompletionDocumentationResponse
352 ),
353 (ResolveInlayHint, ResolveInlayHintResponse),
354 (RespondToChannelInvite, Ack),
355 (RespondToContactRequest, Ack),
356 (SaveBuffer, BufferSaved),
357 (SearchProject, SearchProjectResponse),
358 (SendChannelMessage, SendChannelMessageResponse),
359 (SetChannelMemberRole, Ack),
360 (SetChannelVisibility, Ack),
361 (ShareProject, ShareProjectResponse),
362 (SynchronizeBuffers, SynchronizeBuffersResponse),
363 (Test, Test),
364 (UpdateBuffer, Ack),
365 (UpdateParticipantLocation, Ack),
366 (UpdateProject, Ack),
367 (UpdateWorktree, Ack),
368 (LspExtExpandMacro, LspExtExpandMacroResponse),
369);
370
371entity_messages!(
372 project_id,
373 AddProjectCollaborator,
374 ApplyCodeAction,
375 ApplyCompletionAdditionalEdits,
376 BufferReloaded,
377 BufferSaved,
378 CopyProjectEntry,
379 CreateBufferForPeer,
380 CreateProjectEntry,
381 DeleteProjectEntry,
382 ExpandProjectEntry,
383 FormatBuffers,
384 GetCodeActions,
385 GetCompletions,
386 GetDefinition,
387 GetDocumentHighlights,
388 GetHover,
389 GetProjectSymbols,
390 GetReferences,
391 GetTypeDefinition,
392 InlayHints,
393 JoinProject,
394 LeaveProject,
395 OnTypeFormatting,
396 OpenBufferById,
397 OpenBufferByPath,
398 OpenBufferForSymbol,
399 PerformRename,
400 PrepareRename,
401 RefreshInlayHints,
402 ReloadBuffers,
403 RemoveProjectCollaborator,
404 RenameProjectEntry,
405 ResolveCompletionDocumentation,
406 ResolveInlayHint,
407 SaveBuffer,
408 SearchProject,
409 StartLanguageServer,
410 SynchronizeBuffers,
411 UnshareProject,
412 UpdateBuffer,
413 UpdateBufferFile,
414 UpdateDiagnosticSummary,
415 UpdateDiffBase,
416 UpdateLanguageServer,
417 UpdateProject,
418 UpdateProjectCollaborator,
419 UpdateWorktree,
420 UpdateWorktreeSettings,
421 LspExtExpandMacro,
422);
423
424entity_messages!(
425 channel_id,
426 ChannelMessageSent,
427 RemoveChannelMessage,
428 UpdateChannelBuffer,
429 UpdateChannelBufferCollaborators,
430);
431
432const KIB: usize = 1024;
433const MIB: usize = KIB * 1024;
434const MAX_BUFFER_LEN: usize = MIB;
435
436/// A stream of protobuf messages.
437pub struct MessageStream<S> {
438 stream: S,
439 encoding_buffer: Vec<u8>,
440}
441
442#[allow(clippy::large_enum_variant)]
443#[derive(Debug)]
444pub enum Message {
445 Envelope(Envelope),
446 Ping,
447 Pong,
448}
449
450impl<S> MessageStream<S> {
451 pub fn new(stream: S) -> Self {
452 Self {
453 stream,
454 encoding_buffer: Vec::new(),
455 }
456 }
457
458 pub fn inner_mut(&mut self) -> &mut S {
459 &mut self.stream
460 }
461}
462
463impl<S> MessageStream<S>
464where
465 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
466{
467 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
468 #[cfg(any(test, feature = "test-support"))]
469 const COMPRESSION_LEVEL: i32 = -7;
470
471 #[cfg(not(any(test, feature = "test-support")))]
472 const COMPRESSION_LEVEL: i32 = 4;
473
474 match message {
475 Message::Envelope(message) => {
476 self.encoding_buffer.reserve(message.encoded_len());
477 message
478 .encode(&mut self.encoding_buffer)
479 .map_err(io::Error::from)?;
480 let buffer =
481 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
482 .unwrap();
483
484 self.encoding_buffer.clear();
485 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
486 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
487 }
488 Message::Ping => {
489 self.stream
490 .send(WebSocketMessage::Ping(Default::default()))
491 .await?;
492 }
493 Message::Pong => {
494 self.stream
495 .send(WebSocketMessage::Pong(Default::default()))
496 .await?;
497 }
498 }
499
500 Ok(())
501 }
502}
503
504impl<S> MessageStream<S>
505where
506 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
507{
508 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
509 while let Some(bytes) = self.stream.next().await {
510 match bytes? {
511 WebSocketMessage::Binary(bytes) => {
512 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
513 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
514 .map_err(io::Error::from)?;
515
516 self.encoding_buffer.clear();
517 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
518 return Ok(Message::Envelope(envelope));
519 }
520 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
521 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
522 WebSocketMessage::Close(_) => break,
523 _ => {}
524 }
525 }
526 Err(anyhow!("connection closed"))
527 }
528}
529
530impl From<Timestamp> for SystemTime {
531 fn from(val: Timestamp) -> Self {
532 UNIX_EPOCH
533 .checked_add(Duration::new(val.seconds, val.nanos))
534 .unwrap()
535 }
536}
537
538impl From<SystemTime> for Timestamp {
539 fn from(time: SystemTime) -> Self {
540 let duration = time.duration_since(UNIX_EPOCH).unwrap();
541 Self {
542 seconds: duration.as_secs(),
543 nanos: duration.subsec_nanos(),
544 }
545 }
546}
547
548impl From<u128> for Nonce {
549 fn from(nonce: u128) -> Self {
550 let upper_half = (nonce >> 64) as u64;
551 let lower_half = nonce as u64;
552 Self {
553 upper_half,
554 lower_half,
555 }
556 }
557}
558
559impl From<Nonce> for u128 {
560 fn from(nonce: Nonce) -> Self {
561 let upper_half = (nonce.upper_half as u128) << 64;
562 let lower_half = nonce.lower_half as u128;
563 upper_half | lower_half
564 }
565}
566
567pub fn split_worktree_update(
568 mut message: UpdateWorktree,
569 max_chunk_size: usize,
570) -> impl Iterator<Item = UpdateWorktree> {
571 let mut done_files = false;
572
573 let mut repository_map = message
574 .updated_repositories
575 .into_iter()
576 .map(|repo| (repo.work_directory_id, repo))
577 .collect::<HashMap<_, _>>();
578
579 iter::from_fn(move || {
580 if done_files {
581 return None;
582 }
583
584 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
585 let updated_entries: Vec<_> = message
586 .updated_entries
587 .drain(..updated_entries_chunk_size)
588 .collect();
589
590 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
591 let removed_entries = message
592 .removed_entries
593 .drain(..removed_entries_chunk_size)
594 .collect();
595
596 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
597
598 let mut updated_repositories = Vec::new();
599
600 if !repository_map.is_empty() {
601 for entry in &updated_entries {
602 if let Some(repo) = repository_map.remove(&entry.id) {
603 updated_repositories.push(repo)
604 }
605 }
606 }
607
608 let removed_repositories = if done_files {
609 mem::take(&mut message.removed_repositories)
610 } else {
611 Default::default()
612 };
613
614 if done_files {
615 updated_repositories.extend(mem::take(&mut repository_map).into_values());
616 }
617
618 Some(UpdateWorktree {
619 project_id: message.project_id,
620 worktree_id: message.worktree_id,
621 root_name: message.root_name.clone(),
622 abs_path: message.abs_path.clone(),
623 updated_entries,
624 removed_entries,
625 scan_id: message.scan_id,
626 is_last_update: done_files && message.is_last_update,
627 updated_repositories,
628 removed_repositories,
629 })
630 })
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636
637 #[gpui::test]
638 async fn test_buffer_size() {
639 let (tx, rx) = futures::channel::mpsc::unbounded();
640 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
641 sink.write(Message::Envelope(Envelope {
642 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
643 root_name: "abcdefg".repeat(10),
644 ..Default::default()
645 })),
646 ..Default::default()
647 }))
648 .await
649 .unwrap();
650 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
651 sink.write(Message::Envelope(Envelope {
652 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
653 root_name: "abcdefg".repeat(1000000),
654 ..Default::default()
655 })),
656 ..Default::default()
657 }))
658 .await
659 .unwrap();
660 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
661
662 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
663 stream.read().await.unwrap();
664 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
665 stream.read().await.unwrap();
666 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
667 }
668
669 #[gpui::test]
670 fn test_converting_peer_id_from_and_to_u64() {
671 let peer_id = PeerId {
672 owner_id: 10,
673 id: 3,
674 };
675 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
676 let peer_id = PeerId {
677 owner_id: u32::MAX,
678 id: 3,
679 };
680 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
681 let peer_id = PeerId {
682 owner_id: 10,
683 id: u32::MAX,
684 };
685 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
686 let peer_id = PeerId {
687 owner_id: u32::MAX,
688 id: u32::MAX,
689 };
690 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
691 }
692}