1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AckBufferOperation, Background),
137 (AckChannelMessage, Background),
138 (AddNotification, Foreground),
139 (AddProjectCollaborator, Foreground),
140 (ApplyCodeAction, Background),
141 (ApplyCodeActionResponse, Background),
142 (ApplyCompletionAdditionalEdits, Background),
143 (ApplyCompletionAdditionalEditsResponse, Background),
144 (BufferReloaded, Foreground),
145 (BufferSaved, Foreground),
146 (Call, Foreground),
147 (CallCanceled, Foreground),
148 (CancelCall, Foreground),
149 (ChannelMessageSent, Foreground),
150 (CopyProjectEntry, Foreground),
151 (CreateBufferForPeer, Foreground),
152 (CreateChannel, Foreground),
153 (CreateChannelResponse, Foreground),
154 (CreateProjectEntry, Foreground),
155 (CreateRoom, Foreground),
156 (CreateRoomResponse, Foreground),
157 (DeclineCall, Foreground),
158 (DeleteChannel, Foreground),
159 (DeleteNotification, Foreground),
160 (DeleteProjectEntry, Foreground),
161 (Error, Foreground),
162 (ExpandProjectEntry, Foreground),
163 (ExpandProjectEntryResponse, Foreground),
164 (Follow, Foreground),
165 (FollowResponse, Foreground),
166 (FormatBuffers, Foreground),
167 (FormatBuffersResponse, Foreground),
168 (FuzzySearchUsers, Foreground),
169 (GetChannelMembers, Foreground),
170 (GetChannelMembersResponse, Foreground),
171 (GetChannelMessages, Background),
172 (GetChannelMessagesById, Background),
173 (GetChannelMessagesResponse, Background),
174 (GetCodeActions, Background),
175 (GetCodeActionsResponse, Background),
176 (GetCompletions, Background),
177 (GetCompletionsResponse, Background),
178 (GetDefinition, Background),
179 (GetDefinitionResponse, Background),
180 (GetDocumentHighlights, Background),
181 (GetDocumentHighlightsResponse, Background),
182 (GetHover, Background),
183 (GetHoverResponse, Background),
184 (GetNotifications, Foreground),
185 (GetNotificationsResponse, Foreground),
186 (GetPrivateUserInfo, Foreground),
187 (GetPrivateUserInfoResponse, Foreground),
188 (GetProjectSymbols, Background),
189 (GetProjectSymbolsResponse, Background),
190 (GetReferences, Background),
191 (GetReferencesResponse, Background),
192 (GetTypeDefinition, Background),
193 (GetTypeDefinitionResponse, Background),
194 (GetUsers, Foreground),
195 (Hello, Foreground),
196 (IncomingCall, Foreground),
197 (InlayHints, Background),
198 (InlayHintsResponse, Background),
199 (InviteChannelMember, Foreground),
200 (JoinChannel, Foreground),
201 (JoinChannelBuffer, Foreground),
202 (JoinChannelBufferResponse, Foreground),
203 (JoinChannelChat, Foreground),
204 (JoinChannelChatResponse, Foreground),
205 (JoinProject, Foreground),
206 (JoinProjectResponse, Foreground),
207 (JoinRoom, Foreground),
208 (JoinRoomResponse, Foreground),
209 (LeaveChannelBuffer, Background),
210 (LeaveChannelChat, Foreground),
211 (LeaveProject, Foreground),
212 (LeaveRoom, Foreground),
213 (MarkNotificationRead, Foreground),
214 (MoveChannel, Foreground),
215 (OnTypeFormatting, Background),
216 (OnTypeFormattingResponse, Background),
217 (OpenBufferById, Background),
218 (OpenBufferByPath, Background),
219 (OpenBufferForSymbol, Background),
220 (OpenBufferForSymbolResponse, Background),
221 (OpenBufferResponse, Background),
222 (PerformRename, Background),
223 (PerformRenameResponse, Background),
224 (Ping, Foreground),
225 (PrepareRename, Background),
226 (PrepareRenameResponse, Background),
227 (ProjectEntryResponse, Foreground),
228 (RefreshInlayHints, Foreground),
229 (RejoinChannelBuffers, Foreground),
230 (RejoinChannelBuffersResponse, Foreground),
231 (RejoinRoom, Foreground),
232 (RejoinRoomResponse, Foreground),
233 (ReloadBuffers, Foreground),
234 (ReloadBuffersResponse, Foreground),
235 (RemoveChannelMember, Foreground),
236 (RemoveChannelMessage, Foreground),
237 (RemoveContact, Foreground),
238 (RemoveProjectCollaborator, Foreground),
239 (RenameChannel, Foreground),
240 (RenameChannelResponse, Foreground),
241 (RenameProjectEntry, Foreground),
242 (RequestContact, Foreground),
243 (ResolveCompletionDocumentation, Background),
244 (ResolveCompletionDocumentationResponse, Background),
245 (ResolveInlayHint, Background),
246 (ResolveInlayHintResponse, Background),
247 (RespondToChannelInvite, Foreground),
248 (RespondToContactRequest, Foreground),
249 (RoomUpdated, Foreground),
250 (SaveBuffer, Foreground),
251 (SetChannelMemberRole, Foreground),
252 (SetChannelVisibility, Foreground),
253 (SearchProject, Background),
254 (SearchProjectResponse, Background),
255 (SendChannelMessage, Background),
256 (SendChannelMessageResponse, Background),
257 (ShareProject, Foreground),
258 (ShareProjectResponse, Foreground),
259 (ShowContacts, Foreground),
260 (StartLanguageServer, Foreground),
261 (SynchronizeBuffers, Foreground),
262 (SynchronizeBuffersResponse, Foreground),
263 (Test, Foreground),
264 (Unfollow, Foreground),
265 (UnshareProject, Foreground),
266 (UpdateBuffer, Foreground),
267 (UpdateBufferFile, Foreground),
268 (UpdateChannelBuffer, Foreground),
269 (UpdateChannelBufferCollaborators, Foreground),
270 (UpdateChannels, Foreground),
271 (UpdateContacts, Foreground),
272 (UpdateDiagnosticSummary, Foreground),
273 (UpdateDiffBase, Foreground),
274 (UpdateFollowers, Foreground),
275 (UpdateInviteInfo, Foreground),
276 (UpdateLanguageServer, Foreground),
277 (UpdateParticipantLocation, Foreground),
278 (UpdateProject, Foreground),
279 (UpdateProjectCollaborator, Foreground),
280 (UpdateWorktree, Foreground),
281 (UpdateWorktreeSettings, Foreground),
282 (UsersResponse, Foreground),
283);
284
285request_messages!(
286 (ApplyCodeAction, ApplyCodeActionResponse),
287 (
288 ApplyCompletionAdditionalEdits,
289 ApplyCompletionAdditionalEditsResponse
290 ),
291 (Call, Ack),
292 (CancelCall, Ack),
293 (CopyProjectEntry, ProjectEntryResponse),
294 (CreateChannel, CreateChannelResponse),
295 (CreateProjectEntry, ProjectEntryResponse),
296 (CreateRoom, CreateRoomResponse),
297 (DeclineCall, Ack),
298 (DeleteChannel, Ack),
299 (DeleteProjectEntry, ProjectEntryResponse),
300 (ExpandProjectEntry, ExpandProjectEntryResponse),
301 (Follow, FollowResponse),
302 (FormatBuffers, FormatBuffersResponse),
303 (FuzzySearchUsers, UsersResponse),
304 (GetChannelMembers, GetChannelMembersResponse),
305 (GetChannelMessages, GetChannelMessagesResponse),
306 (GetChannelMessagesById, GetChannelMessagesResponse),
307 (GetCodeActions, GetCodeActionsResponse),
308 (GetCompletions, GetCompletionsResponse),
309 (GetDefinition, GetDefinitionResponse),
310 (GetDocumentHighlights, GetDocumentHighlightsResponse),
311 (GetHover, GetHoverResponse),
312 (GetNotifications, GetNotificationsResponse),
313 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
314 (GetProjectSymbols, GetProjectSymbolsResponse),
315 (GetReferences, GetReferencesResponse),
316 (GetTypeDefinition, GetTypeDefinitionResponse),
317 (GetUsers, UsersResponse),
318 (IncomingCall, Ack),
319 (InlayHints, InlayHintsResponse),
320 (InviteChannelMember, Ack),
321 (JoinChannel, JoinRoomResponse),
322 (JoinChannelBuffer, JoinChannelBufferResponse),
323 (JoinChannelChat, JoinChannelChatResponse),
324 (JoinProject, JoinProjectResponse),
325 (JoinRoom, JoinRoomResponse),
326 (LeaveChannelBuffer, Ack),
327 (LeaveRoom, Ack),
328 (MarkNotificationRead, Ack),
329 (MoveChannel, Ack),
330 (OnTypeFormatting, OnTypeFormattingResponse),
331 (OpenBufferById, OpenBufferResponse),
332 (OpenBufferByPath, OpenBufferResponse),
333 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
334 (PerformRename, PerformRenameResponse),
335 (Ping, Ack),
336 (PrepareRename, PrepareRenameResponse),
337 (RefreshInlayHints, Ack),
338 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
339 (RejoinRoom, RejoinRoomResponse),
340 (ReloadBuffers, ReloadBuffersResponse),
341 (RemoveChannelMember, Ack),
342 (RemoveChannelMessage, Ack),
343 (RemoveContact, Ack),
344 (RenameChannel, RenameChannelResponse),
345 (RenameProjectEntry, ProjectEntryResponse),
346 (RequestContact, Ack),
347 (
348 ResolveCompletionDocumentation,
349 ResolveCompletionDocumentationResponse
350 ),
351 (ResolveInlayHint, ResolveInlayHintResponse),
352 (RespondToChannelInvite, Ack),
353 (RespondToContactRequest, Ack),
354 (SaveBuffer, BufferSaved),
355 (SearchProject, SearchProjectResponse),
356 (SendChannelMessage, SendChannelMessageResponse),
357 (SetChannelMemberRole, Ack),
358 (SetChannelVisibility, Ack),
359 (ShareProject, ShareProjectResponse),
360 (SynchronizeBuffers, SynchronizeBuffersResponse),
361 (Test, Test),
362 (UpdateBuffer, Ack),
363 (UpdateParticipantLocation, Ack),
364 (UpdateProject, Ack),
365 (UpdateWorktree, Ack),
366);
367
368entity_messages!(
369 project_id,
370 AddProjectCollaborator,
371 ApplyCodeAction,
372 ApplyCompletionAdditionalEdits,
373 BufferReloaded,
374 BufferSaved,
375 CopyProjectEntry,
376 CreateBufferForPeer,
377 CreateProjectEntry,
378 DeleteProjectEntry,
379 ExpandProjectEntry,
380 FormatBuffers,
381 GetCodeActions,
382 GetCompletions,
383 GetDefinition,
384 GetDocumentHighlights,
385 GetHover,
386 GetProjectSymbols,
387 GetReferences,
388 GetTypeDefinition,
389 InlayHints,
390 JoinProject,
391 LeaveProject,
392 OnTypeFormatting,
393 OpenBufferById,
394 OpenBufferByPath,
395 OpenBufferForSymbol,
396 PerformRename,
397 PrepareRename,
398 RefreshInlayHints,
399 ReloadBuffers,
400 RemoveProjectCollaborator,
401 RenameProjectEntry,
402 ResolveCompletionDocumentation,
403 ResolveInlayHint,
404 SaveBuffer,
405 SearchProject,
406 StartLanguageServer,
407 SynchronizeBuffers,
408 UnshareProject,
409 UpdateBuffer,
410 UpdateBufferFile,
411 UpdateDiagnosticSummary,
412 UpdateDiffBase,
413 UpdateLanguageServer,
414 UpdateProject,
415 UpdateProjectCollaborator,
416 UpdateWorktree,
417 UpdateWorktreeSettings,
418);
419
420entity_messages!(
421 channel_id,
422 ChannelMessageSent,
423 RemoveChannelMessage,
424 UpdateChannelBuffer,
425 UpdateChannelBufferCollaborators,
426);
427
428const KIB: usize = 1024;
429const MIB: usize = KIB * 1024;
430const MAX_BUFFER_LEN: usize = MIB;
431
432/// A stream of protobuf messages.
433pub struct MessageStream<S> {
434 stream: S,
435 encoding_buffer: Vec<u8>,
436}
437
438#[allow(clippy::large_enum_variant)]
439#[derive(Debug)]
440pub enum Message {
441 Envelope(Envelope),
442 Ping,
443 Pong,
444}
445
446impl<S> MessageStream<S> {
447 pub fn new(stream: S) -> Self {
448 Self {
449 stream,
450 encoding_buffer: Vec::new(),
451 }
452 }
453
454 pub fn inner_mut(&mut self) -> &mut S {
455 &mut self.stream
456 }
457}
458
459impl<S> MessageStream<S>
460where
461 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
462{
463 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
464 #[cfg(any(test, feature = "test-support"))]
465 const COMPRESSION_LEVEL: i32 = -7;
466
467 #[cfg(not(any(test, feature = "test-support")))]
468 const COMPRESSION_LEVEL: i32 = 4;
469
470 match message {
471 Message::Envelope(message) => {
472 self.encoding_buffer.reserve(message.encoded_len());
473 message
474 .encode(&mut self.encoding_buffer)
475 .map_err(io::Error::from)?;
476 let buffer =
477 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
478 .unwrap();
479
480 self.encoding_buffer.clear();
481 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
482 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
483 }
484 Message::Ping => {
485 self.stream
486 .send(WebSocketMessage::Ping(Default::default()))
487 .await?;
488 }
489 Message::Pong => {
490 self.stream
491 .send(WebSocketMessage::Pong(Default::default()))
492 .await?;
493 }
494 }
495
496 Ok(())
497 }
498}
499
500impl<S> MessageStream<S>
501where
502 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
503{
504 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
505 while let Some(bytes) = self.stream.next().await {
506 match bytes? {
507 WebSocketMessage::Binary(bytes) => {
508 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
509 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
510 .map_err(io::Error::from)?;
511
512 self.encoding_buffer.clear();
513 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
514 return Ok(Message::Envelope(envelope));
515 }
516 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
517 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
518 WebSocketMessage::Close(_) => break,
519 _ => {}
520 }
521 }
522 Err(anyhow!("connection closed"))
523 }
524}
525
526impl From<Timestamp> for SystemTime {
527 fn from(val: Timestamp) -> Self {
528 UNIX_EPOCH
529 .checked_add(Duration::new(val.seconds, val.nanos))
530 .unwrap()
531 }
532}
533
534impl From<SystemTime> for Timestamp {
535 fn from(time: SystemTime) -> Self {
536 let duration = time.duration_since(UNIX_EPOCH).unwrap();
537 Self {
538 seconds: duration.as_secs(),
539 nanos: duration.subsec_nanos(),
540 }
541 }
542}
543
544impl From<u128> for Nonce {
545 fn from(nonce: u128) -> Self {
546 let upper_half = (nonce >> 64) as u64;
547 let lower_half = nonce as u64;
548 Self {
549 upper_half,
550 lower_half,
551 }
552 }
553}
554
555impl From<Nonce> for u128 {
556 fn from(nonce: Nonce) -> Self {
557 let upper_half = (nonce.upper_half as u128) << 64;
558 let lower_half = nonce.lower_half as u128;
559 upper_half | lower_half
560 }
561}
562
563pub fn split_worktree_update(
564 mut message: UpdateWorktree,
565 max_chunk_size: usize,
566) -> impl Iterator<Item = UpdateWorktree> {
567 let mut done_files = false;
568
569 let mut repository_map = message
570 .updated_repositories
571 .into_iter()
572 .map(|repo| (repo.work_directory_id, repo))
573 .collect::<HashMap<_, _>>();
574
575 iter::from_fn(move || {
576 if done_files {
577 return None;
578 }
579
580 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
581 let updated_entries: Vec<_> = message
582 .updated_entries
583 .drain(..updated_entries_chunk_size)
584 .collect();
585
586 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
587 let removed_entries = message
588 .removed_entries
589 .drain(..removed_entries_chunk_size)
590 .collect();
591
592 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
593
594 let mut updated_repositories = Vec::new();
595
596 if !repository_map.is_empty() {
597 for entry in &updated_entries {
598 if let Some(repo) = repository_map.remove(&entry.id) {
599 updated_repositories.push(repo)
600 }
601 }
602 }
603
604 let removed_repositories = if done_files {
605 mem::take(&mut message.removed_repositories)
606 } else {
607 Default::default()
608 };
609
610 if done_files {
611 updated_repositories.extend(mem::take(&mut repository_map).into_values());
612 }
613
614 Some(UpdateWorktree {
615 project_id: message.project_id,
616 worktree_id: message.worktree_id,
617 root_name: message.root_name.clone(),
618 abs_path: message.abs_path.clone(),
619 updated_entries,
620 removed_entries,
621 scan_id: message.scan_id,
622 is_last_update: done_files && message.is_last_update,
623 updated_repositories,
624 removed_repositories,
625 })
626 })
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632
633 #[gpui::test]
634 async fn test_buffer_size() {
635 let (tx, rx) = futures::channel::mpsc::unbounded();
636 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
637 sink.write(Message::Envelope(Envelope {
638 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
639 root_name: "abcdefg".repeat(10),
640 ..Default::default()
641 })),
642 ..Default::default()
643 }))
644 .await
645 .unwrap();
646 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
647 sink.write(Message::Envelope(Envelope {
648 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
649 root_name: "abcdefg".repeat(1000000),
650 ..Default::default()
651 })),
652 ..Default::default()
653 }))
654 .await
655 .unwrap();
656 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
657
658 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
659 stream.read().await.unwrap();
660 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
661 stream.read().await.unwrap();
662 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
663 }
664
665 #[gpui::test]
666 fn test_converting_peer_id_from_and_to_u64() {
667 let peer_id = PeerId {
668 owner_id: 10,
669 id: 3,
670 };
671 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
672 let peer_id = PeerId {
673 owner_id: u32::MAX,
674 id: 3,
675 };
676 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
677 let peer_id = PeerId {
678 owner_id: 10,
679 id: u32::MAX,
680 };
681 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
682 let peer_id = PeerId {
683 owner_id: u32::MAX,
684 id: u32::MAX,
685 };
686 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
687 }
688}