1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AckBufferOperation, Background),
137 (AckChannelMessage, Background),
138 (AddProjectCollaborator, Foreground),
139 (ApplyCodeAction, Background),
140 (ApplyCodeActionResponse, Background),
141 (ApplyCompletionAdditionalEdits, Background),
142 (ApplyCompletionAdditionalEditsResponse, Background),
143 (BufferReloaded, Foreground),
144 (BufferSaved, Foreground),
145 (Call, Foreground),
146 (CallCanceled, Foreground),
147 (CancelCall, Foreground),
148 (ChannelMessageSent, Foreground),
149 (CopyProjectEntry, Foreground),
150 (CreateBufferForPeer, Foreground),
151 (CreateChannel, Foreground),
152 (CreateChannelResponse, Foreground),
153 (CreateProjectEntry, Foreground),
154 (CreateRoom, Foreground),
155 (CreateRoomResponse, Foreground),
156 (DeclineCall, Foreground),
157 (DeleteChannel, Foreground),
158 (DeleteNotification, Foreground),
159 (DeleteProjectEntry, Foreground),
160 (Error, Foreground),
161 (ExpandProjectEntry, Foreground),
162 (ExpandProjectEntryResponse, Foreground),
163 (Follow, Foreground),
164 (FollowResponse, Foreground),
165 (FormatBuffers, Foreground),
166 (FormatBuffersResponse, Foreground),
167 (FuzzySearchUsers, Foreground),
168 (GetChannelMembers, Foreground),
169 (GetChannelMembersResponse, Foreground),
170 (GetChannelMessages, Background),
171 (GetChannelMessagesById, Background),
172 (GetChannelMessagesResponse, Background),
173 (GetCodeActions, Background),
174 (GetCodeActionsResponse, Background),
175 (GetCompletions, Background),
176 (GetCompletionsResponse, Background),
177 (GetDefinition, Background),
178 (GetDefinitionResponse, Background),
179 (GetDocumentHighlights, Background),
180 (GetDocumentHighlightsResponse, Background),
181 (GetHover, Background),
182 (GetHoverResponse, Background),
183 (GetNotifications, Foreground),
184 (GetNotificationsResponse, Foreground),
185 (GetPrivateUserInfo, Foreground),
186 (GetPrivateUserInfoResponse, Foreground),
187 (GetProjectSymbols, Background),
188 (GetProjectSymbolsResponse, Background),
189 (GetReferences, Background),
190 (GetReferencesResponse, Background),
191 (GetTypeDefinition, Background),
192 (GetTypeDefinitionResponse, Background),
193 (GetUsers, Foreground),
194 (Hello, Foreground),
195 (IncomingCall, Foreground),
196 (InlayHints, Background),
197 (InlayHintsResponse, Background),
198 (InviteChannelMember, Foreground),
199 (JoinChannel, Foreground),
200 (JoinChannelBuffer, Foreground),
201 (JoinChannelBufferResponse, Foreground),
202 (JoinChannelChat, Foreground),
203 (JoinChannelChatResponse, Foreground),
204 (JoinProject, Foreground),
205 (JoinProjectResponse, Foreground),
206 (JoinRoom, Foreground),
207 (JoinRoomResponse, Foreground),
208 (LeaveChannelBuffer, Background),
209 (LeaveChannelChat, Foreground),
210 (LeaveProject, Foreground),
211 (LeaveRoom, Foreground),
212 (LinkChannel, Foreground),
213 (MarkNotificationsRead, Foreground),
214 (MoveChannel, Foreground),
215 (NewNotification, Foreground),
216 (OnTypeFormatting, Background),
217 (OnTypeFormattingResponse, Background),
218 (OpenBufferById, Background),
219 (OpenBufferByPath, Background),
220 (OpenBufferForSymbol, Background),
221 (OpenBufferForSymbolResponse, Background),
222 (OpenBufferResponse, Background),
223 (PerformRename, Background),
224 (PerformRenameResponse, Background),
225 (Ping, Foreground),
226 (PrepareRename, Background),
227 (PrepareRenameResponse, Background),
228 (ProjectEntryResponse, Foreground),
229 (RefreshInlayHints, Foreground),
230 (RejoinChannelBuffers, Foreground),
231 (RejoinChannelBuffersResponse, Foreground),
232 (RejoinRoom, Foreground),
233 (RejoinRoomResponse, Foreground),
234 (ReloadBuffers, Foreground),
235 (ReloadBuffersResponse, Foreground),
236 (RemoveChannelMember, Foreground),
237 (RemoveChannelMessage, Foreground),
238 (RemoveContact, Foreground),
239 (RemoveProjectCollaborator, Foreground),
240 (RenameChannel, Foreground),
241 (RenameChannelResponse, Foreground),
242 (RenameProjectEntry, Foreground),
243 (RequestContact, Foreground),
244 (ResolveCompletionDocumentation, Background),
245 (ResolveCompletionDocumentationResponse, Background),
246 (ResolveInlayHint, Background),
247 (ResolveInlayHintResponse, Background),
248 (RespondToChannelInvite, Foreground),
249 (RespondToContactRequest, Foreground),
250 (RoomUpdated, Foreground),
251 (SaveBuffer, Foreground),
252 (SearchProject, Background),
253 (SearchProjectResponse, Background),
254 (SendChannelMessage, Background),
255 (SendChannelMessageResponse, Background),
256 (SetChannelMemberAdmin, Foreground),
257 (ShareProject, Foreground),
258 (ShareProjectResponse, Foreground),
259 (ShowContacts, Foreground),
260 (StartLanguageServer, Foreground),
261 (SynchronizeBuffers, Foreground),
262 (SynchronizeBuffersResponse, Foreground),
263 (Test, Foreground),
264 (Unfollow, Foreground),
265 (UnlinkChannel, Foreground),
266 (UnshareProject, Foreground),
267 (UpdateBuffer, Foreground),
268 (UpdateBufferFile, Foreground),
269 (UpdateChannelBuffer, Foreground),
270 (UpdateChannelBufferCollaborators, Foreground),
271 (UpdateChannels, Foreground),
272 (UpdateContacts, Foreground),
273 (UpdateDiagnosticSummary, Foreground),
274 (UpdateDiffBase, Foreground),
275 (UpdateFollowers, Foreground),
276 (UpdateInviteInfo, Foreground),
277 (UpdateLanguageServer, Foreground),
278 (UpdateParticipantLocation, Foreground),
279 (UpdateProject, Foreground),
280 (UpdateProjectCollaborator, Foreground),
281 (UpdateWorktree, Foreground),
282 (UpdateWorktreeSettings, Foreground),
283 (UsersResponse, Foreground),
284);
285
286request_messages!(
287 (ApplyCodeAction, ApplyCodeActionResponse),
288 (
289 ApplyCompletionAdditionalEdits,
290 ApplyCompletionAdditionalEditsResponse
291 ),
292 (Call, Ack),
293 (CancelCall, Ack),
294 (CopyProjectEntry, ProjectEntryResponse),
295 (CreateChannel, CreateChannelResponse),
296 (CreateProjectEntry, ProjectEntryResponse),
297 (CreateRoom, CreateRoomResponse),
298 (DeclineCall, Ack),
299 (DeleteChannel, Ack),
300 (DeleteProjectEntry, ProjectEntryResponse),
301 (ExpandProjectEntry, ExpandProjectEntryResponse),
302 (Follow, FollowResponse),
303 (FormatBuffers, FormatBuffersResponse),
304 (FuzzySearchUsers, UsersResponse),
305 (GetChannelMembers, GetChannelMembersResponse),
306 (GetChannelMessages, GetChannelMessagesResponse),
307 (GetChannelMessagesById, GetChannelMessagesResponse),
308 (GetCodeActions, GetCodeActionsResponse),
309 (GetCompletions, GetCompletionsResponse),
310 (GetDefinition, GetDefinitionResponse),
311 (GetDocumentHighlights, GetDocumentHighlightsResponse),
312 (GetHover, GetHoverResponse),
313 (GetNotifications, GetNotificationsResponse),
314 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
315 (GetProjectSymbols, GetProjectSymbolsResponse),
316 (GetReferences, GetReferencesResponse),
317 (GetTypeDefinition, GetTypeDefinitionResponse),
318 (GetUsers, UsersResponse),
319 (IncomingCall, Ack),
320 (InlayHints, InlayHintsResponse),
321 (InviteChannelMember, Ack),
322 (JoinChannel, JoinRoomResponse),
323 (JoinChannelBuffer, JoinChannelBufferResponse),
324 (JoinChannelChat, JoinChannelChatResponse),
325 (JoinProject, JoinProjectResponse),
326 (JoinRoom, JoinRoomResponse),
327 (LeaveChannelBuffer, Ack),
328 (LeaveRoom, Ack),
329 (LinkChannel, Ack),
330 (MarkNotificationsRead, Ack),
331 (MoveChannel, Ack),
332 (OnTypeFormatting, OnTypeFormattingResponse),
333 (OpenBufferById, OpenBufferResponse),
334 (OpenBufferByPath, OpenBufferResponse),
335 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
336 (PerformRename, PerformRenameResponse),
337 (Ping, Ack),
338 (PrepareRename, PrepareRenameResponse),
339 (RefreshInlayHints, Ack),
340 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
341 (RejoinRoom, RejoinRoomResponse),
342 (ReloadBuffers, ReloadBuffersResponse),
343 (RemoveChannelMember, Ack),
344 (RemoveChannelMessage, Ack),
345 (RemoveContact, Ack),
346 (RenameChannel, RenameChannelResponse),
347 (RenameProjectEntry, ProjectEntryResponse),
348 (RequestContact, Ack),
349 (
350 ResolveCompletionDocumentation,
351 ResolveCompletionDocumentationResponse
352 ),
353 (ResolveInlayHint, ResolveInlayHintResponse),
354 (RespondToChannelInvite, Ack),
355 (RespondToContactRequest, Ack),
356 (SaveBuffer, BufferSaved),
357 (SearchProject, SearchProjectResponse),
358 (SendChannelMessage, SendChannelMessageResponse),
359 (SetChannelMemberAdmin, Ack),
360 (ShareProject, ShareProjectResponse),
361 (SynchronizeBuffers, SynchronizeBuffersResponse),
362 (Test, Test),
363 (UnlinkChannel, Ack),
364 (UpdateBuffer, Ack),
365 (UpdateParticipantLocation, Ack),
366 (UpdateProject, Ack),
367 (UpdateWorktree, Ack),
368);
369
370entity_messages!(
371 project_id,
372 AddProjectCollaborator,
373 ApplyCodeAction,
374 ApplyCompletionAdditionalEdits,
375 BufferReloaded,
376 BufferSaved,
377 CopyProjectEntry,
378 CreateBufferForPeer,
379 CreateProjectEntry,
380 DeleteProjectEntry,
381 ExpandProjectEntry,
382 FormatBuffers,
383 GetCodeActions,
384 GetCompletions,
385 GetDefinition,
386 GetDocumentHighlights,
387 GetHover,
388 GetProjectSymbols,
389 GetReferences,
390 GetTypeDefinition,
391 InlayHints,
392 JoinProject,
393 LeaveProject,
394 OnTypeFormatting,
395 OpenBufferById,
396 OpenBufferByPath,
397 OpenBufferForSymbol,
398 PerformRename,
399 PrepareRename,
400 RefreshInlayHints,
401 ReloadBuffers,
402 RemoveProjectCollaborator,
403 RenameProjectEntry,
404 ResolveCompletionDocumentation,
405 ResolveInlayHint,
406 SaveBuffer,
407 SearchProject,
408 StartLanguageServer,
409 SynchronizeBuffers,
410 UnshareProject,
411 UpdateBuffer,
412 UpdateBufferFile,
413 UpdateDiagnosticSummary,
414 UpdateDiffBase,
415 UpdateLanguageServer,
416 UpdateProject,
417 UpdateProjectCollaborator,
418 UpdateWorktree,
419 UpdateWorktreeSettings,
420);
421
422entity_messages!(
423 channel_id,
424 ChannelMessageSent,
425 RemoveChannelMessage,
426 UpdateChannelBuffer,
427 UpdateChannelBufferCollaborators,
428);
429
430const KIB: usize = 1024;
431const MIB: usize = KIB * 1024;
432const MAX_BUFFER_LEN: usize = MIB;
433
434/// A stream of protobuf messages.
435pub struct MessageStream<S> {
436 stream: S,
437 encoding_buffer: Vec<u8>,
438}
439
440#[allow(clippy::large_enum_variant)]
441#[derive(Debug)]
442pub enum Message {
443 Envelope(Envelope),
444 Ping,
445 Pong,
446}
447
448impl<S> MessageStream<S> {
449 pub fn new(stream: S) -> Self {
450 Self {
451 stream,
452 encoding_buffer: Vec::new(),
453 }
454 }
455
456 pub fn inner_mut(&mut self) -> &mut S {
457 &mut self.stream
458 }
459}
460
461impl<S> MessageStream<S>
462where
463 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
464{
465 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
466 #[cfg(any(test, feature = "test-support"))]
467 const COMPRESSION_LEVEL: i32 = -7;
468
469 #[cfg(not(any(test, feature = "test-support")))]
470 const COMPRESSION_LEVEL: i32 = 4;
471
472 match message {
473 Message::Envelope(message) => {
474 self.encoding_buffer.reserve(message.encoded_len());
475 message
476 .encode(&mut self.encoding_buffer)
477 .map_err(io::Error::from)?;
478 let buffer =
479 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
480 .unwrap();
481
482 self.encoding_buffer.clear();
483 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
484 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
485 }
486 Message::Ping => {
487 self.stream
488 .send(WebSocketMessage::Ping(Default::default()))
489 .await?;
490 }
491 Message::Pong => {
492 self.stream
493 .send(WebSocketMessage::Pong(Default::default()))
494 .await?;
495 }
496 }
497
498 Ok(())
499 }
500}
501
502impl<S> MessageStream<S>
503where
504 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
505{
506 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
507 while let Some(bytes) = self.stream.next().await {
508 match bytes? {
509 WebSocketMessage::Binary(bytes) => {
510 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
511 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
512 .map_err(io::Error::from)?;
513
514 self.encoding_buffer.clear();
515 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
516 return Ok(Message::Envelope(envelope));
517 }
518 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
519 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
520 WebSocketMessage::Close(_) => break,
521 _ => {}
522 }
523 }
524 Err(anyhow!("connection closed"))
525 }
526}
527
528impl From<Timestamp> for SystemTime {
529 fn from(val: Timestamp) -> Self {
530 UNIX_EPOCH
531 .checked_add(Duration::new(val.seconds, val.nanos))
532 .unwrap()
533 }
534}
535
536impl From<SystemTime> for Timestamp {
537 fn from(time: SystemTime) -> Self {
538 let duration = time.duration_since(UNIX_EPOCH).unwrap();
539 Self {
540 seconds: duration.as_secs(),
541 nanos: duration.subsec_nanos(),
542 }
543 }
544}
545
546impl From<u128> for Nonce {
547 fn from(nonce: u128) -> Self {
548 let upper_half = (nonce >> 64) as u64;
549 let lower_half = nonce as u64;
550 Self {
551 upper_half,
552 lower_half,
553 }
554 }
555}
556
557impl From<Nonce> for u128 {
558 fn from(nonce: Nonce) -> Self {
559 let upper_half = (nonce.upper_half as u128) << 64;
560 let lower_half = nonce.lower_half as u128;
561 upper_half | lower_half
562 }
563}
564
565pub fn split_worktree_update(
566 mut message: UpdateWorktree,
567 max_chunk_size: usize,
568) -> impl Iterator<Item = UpdateWorktree> {
569 let mut done_files = false;
570
571 let mut repository_map = message
572 .updated_repositories
573 .into_iter()
574 .map(|repo| (repo.work_directory_id, repo))
575 .collect::<HashMap<_, _>>();
576
577 iter::from_fn(move || {
578 if done_files {
579 return None;
580 }
581
582 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
583 let updated_entries: Vec<_> = message
584 .updated_entries
585 .drain(..updated_entries_chunk_size)
586 .collect();
587
588 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
589 let removed_entries = message
590 .removed_entries
591 .drain(..removed_entries_chunk_size)
592 .collect();
593
594 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
595
596 let mut updated_repositories = Vec::new();
597
598 if !repository_map.is_empty() {
599 for entry in &updated_entries {
600 if let Some(repo) = repository_map.remove(&entry.id) {
601 updated_repositories.push(repo)
602 }
603 }
604 }
605
606 let removed_repositories = if done_files {
607 mem::take(&mut message.removed_repositories)
608 } else {
609 Default::default()
610 };
611
612 if done_files {
613 updated_repositories.extend(mem::take(&mut repository_map).into_values());
614 }
615
616 Some(UpdateWorktree {
617 project_id: message.project_id,
618 worktree_id: message.worktree_id,
619 root_name: message.root_name.clone(),
620 abs_path: message.abs_path.clone(),
621 updated_entries,
622 removed_entries,
623 scan_id: message.scan_id,
624 is_last_update: done_files && message.is_last_update,
625 updated_repositories,
626 removed_repositories,
627 })
628 })
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634
635 #[gpui::test]
636 async fn test_buffer_size() {
637 let (tx, rx) = futures::channel::mpsc::unbounded();
638 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
639 sink.write(Message::Envelope(Envelope {
640 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
641 root_name: "abcdefg".repeat(10),
642 ..Default::default()
643 })),
644 ..Default::default()
645 }))
646 .await
647 .unwrap();
648 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
649 sink.write(Message::Envelope(Envelope {
650 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
651 root_name: "abcdefg".repeat(1000000),
652 ..Default::default()
653 })),
654 ..Default::default()
655 }))
656 .await
657 .unwrap();
658 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
659
660 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
661 stream.read().await.unwrap();
662 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
663 stream.read().await.unwrap();
664 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
665 }
666
667 #[gpui::test]
668 fn test_converting_peer_id_from_and_to_u64() {
669 let peer_id = PeerId {
670 owner_id: 10,
671 id: 3,
672 };
673 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
674 let peer_id = PeerId {
675 owner_id: u32::MAX,
676 id: 3,
677 };
678 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
679 let peer_id = PeerId {
680 owner_id: 10,
681 id: u32::MAX,
682 };
683 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
684 let peer_id = PeerId {
685 owner_id: u32::MAX,
686 id: u32::MAX,
687 };
688 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
689 }
690}