1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AddProjectCollaborator, Foreground),
137 (ApplyCodeAction, Background),
138 (ApplyCodeActionResponse, Background),
139 (ApplyCompletionAdditionalEdits, Background),
140 (ApplyCompletionAdditionalEditsResponse, Background),
141 (BufferReloaded, Foreground),
142 (BufferSaved, Foreground),
143 (Call, Foreground),
144 (CallCanceled, Foreground),
145 (CancelCall, Foreground),
146 (CopyProjectEntry, Foreground),
147 (CreateBufferForPeer, Foreground),
148 (CreateChannel, Foreground),
149 (ChannelResponse, Foreground),
150 (ChannelMessageSent, Foreground),
151 (CreateProjectEntry, Foreground),
152 (CreateRoom, Foreground),
153 (CreateRoomResponse, Foreground),
154 (DeclineCall, Foreground),
155 (DeleteProjectEntry, Foreground),
156 (Error, Foreground),
157 (ExpandProjectEntry, Foreground),
158 (Follow, Foreground),
159 (FollowResponse, Foreground),
160 (FormatBuffers, Foreground),
161 (FormatBuffersResponse, Foreground),
162 (FuzzySearchUsers, Foreground),
163 (GetCodeActions, Background),
164 (GetCodeActionsResponse, Background),
165 (GetHover, Background),
166 (GetHoverResponse, Background),
167 (GetChannelMessages, Background),
168 (GetChannelMessagesResponse, Background),
169 (SendChannelMessage, Background),
170 (SendChannelMessageResponse, Background),
171 (GetCompletions, Background),
172 (GetCompletionsResponse, Background),
173 (GetDefinition, Background),
174 (GetDefinitionResponse, Background),
175 (GetTypeDefinition, Background),
176 (GetTypeDefinitionResponse, Background),
177 (GetDocumentHighlights, Background),
178 (GetDocumentHighlightsResponse, Background),
179 (GetReferences, Background),
180 (GetReferencesResponse, Background),
181 (GetProjectSymbols, Background),
182 (GetProjectSymbolsResponse, Background),
183 (GetUsers, Foreground),
184 (Hello, Foreground),
185 (IncomingCall, Foreground),
186 (InviteChannelMember, Foreground),
187 (UsersResponse, Foreground),
188 (JoinProject, Foreground),
189 (JoinProjectResponse, Foreground),
190 (JoinRoom, Foreground),
191 (JoinRoomResponse, Foreground),
192 (JoinChannelChat, Foreground),
193 (JoinChannelChatResponse, Foreground),
194 (LeaveChannelChat, Foreground),
195 (LeaveProject, Foreground),
196 (LeaveRoom, Foreground),
197 (OpenBufferById, Background),
198 (OpenBufferByPath, Background),
199 (OpenBufferForSymbol, Background),
200 (OpenBufferForSymbolResponse, Background),
201 (OpenBufferResponse, Background),
202 (PerformRename, Background),
203 (PerformRenameResponse, Background),
204 (OnTypeFormatting, Background),
205 (OnTypeFormattingResponse, Background),
206 (InlayHints, Background),
207 (InlayHintsResponse, Background),
208 (ResolveInlayHint, Background),
209 (ResolveInlayHintResponse, Background),
210 (RefreshInlayHints, Foreground),
211 (Ping, Foreground),
212 (PrepareRename, Background),
213 (PrepareRenameResponse, Background),
214 (ExpandProjectEntryResponse, Foreground),
215 (ProjectEntryResponse, Foreground),
216 (RejoinRoom, Foreground),
217 (RejoinRoomResponse, Foreground),
218 (RemoveContact, Foreground),
219 (RemoveChannelMember, Foreground),
220 (ReloadBuffers, Foreground),
221 (ReloadBuffersResponse, Foreground),
222 (RemoveProjectCollaborator, Foreground),
223 (RenameProjectEntry, Foreground),
224 (RequestContact, Foreground),
225 (RespondToContactRequest, Foreground),
226 (RespondToChannelInvite, Foreground),
227 (JoinChannel, Foreground),
228 (RoomUpdated, Foreground),
229 (SaveBuffer, Foreground),
230 (RenameChannel, Foreground),
231 (SetChannelMemberAdmin, Foreground),
232 (SearchProject, Background),
233 (SearchProjectResponse, Background),
234 (ShareProject, Foreground),
235 (ShareProjectResponse, Foreground),
236 (ShowContacts, Foreground),
237 (StartLanguageServer, Foreground),
238 (SynchronizeBuffers, Foreground),
239 (SynchronizeBuffersResponse, Foreground),
240 (RejoinChannelBuffers, Foreground),
241 (RejoinChannelBuffersResponse, Foreground),
242 (Test, Foreground),
243 (Unfollow, Foreground),
244 (UnshareProject, Foreground),
245 (UpdateBuffer, Foreground),
246 (UpdateBufferFile, Foreground),
247 (UpdateContacts, Foreground),
248 (RemoveChannel, Foreground),
249 (UpdateChannels, Foreground),
250 (UpdateDiagnosticSummary, Foreground),
251 (UpdateFollowers, Foreground),
252 (UpdateInviteInfo, Foreground),
253 (UpdateLanguageServer, Foreground),
254 (UpdateParticipantLocation, Foreground),
255 (UpdateProject, Foreground),
256 (UpdateProjectCollaborator, Foreground),
257 (UpdateWorktree, Foreground),
258 (UpdateWorktreeSettings, Foreground),
259 (UpdateDiffBase, Foreground),
260 (GetPrivateUserInfo, Foreground),
261 (GetPrivateUserInfoResponse, Foreground),
262 (GetChannelMembers, Foreground),
263 (GetChannelMembersResponse, Foreground),
264 (JoinChannelBuffer, Foreground),
265 (JoinChannelBufferResponse, Foreground),
266 (LeaveChannelBuffer, Background),
267 (UpdateChannelBuffer, Foreground),
268 (RemoveChannelBufferCollaborator, Foreground),
269 (AddChannelBufferCollaborator, Foreground),
270 (UpdateChannelBufferCollaborator, Foreground),
271);
272
273request_messages!(
274 (ApplyCodeAction, ApplyCodeActionResponse),
275 (
276 ApplyCompletionAdditionalEdits,
277 ApplyCompletionAdditionalEditsResponse
278 ),
279 (Call, Ack),
280 (CancelCall, Ack),
281 (CopyProjectEntry, ProjectEntryResponse),
282 (CreateProjectEntry, ProjectEntryResponse),
283 (CreateRoom, CreateRoomResponse),
284 (CreateChannel, ChannelResponse),
285 (DeclineCall, Ack),
286 (DeleteProjectEntry, ProjectEntryResponse),
287 (ExpandProjectEntry, ExpandProjectEntryResponse),
288 (Follow, FollowResponse),
289 (FormatBuffers, FormatBuffersResponse),
290 (GetCodeActions, GetCodeActionsResponse),
291 (GetHover, GetHoverResponse),
292 (GetCompletions, GetCompletionsResponse),
293 (GetDefinition, GetDefinitionResponse),
294 (GetTypeDefinition, GetTypeDefinitionResponse),
295 (GetDocumentHighlights, GetDocumentHighlightsResponse),
296 (GetReferences, GetReferencesResponse),
297 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
298 (GetProjectSymbols, GetProjectSymbolsResponse),
299 (FuzzySearchUsers, UsersResponse),
300 (GetUsers, UsersResponse),
301 (InviteChannelMember, Ack),
302 (JoinProject, JoinProjectResponse),
303 (JoinRoom, JoinRoomResponse),
304 (JoinChannelChat, JoinChannelChatResponse),
305 (LeaveRoom, Ack),
306 (RejoinRoom, RejoinRoomResponse),
307 (IncomingCall, Ack),
308 (OpenBufferById, OpenBufferResponse),
309 (OpenBufferByPath, OpenBufferResponse),
310 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
311 (Ping, Ack),
312 (PerformRename, PerformRenameResponse),
313 (PrepareRename, PrepareRenameResponse),
314 (OnTypeFormatting, OnTypeFormattingResponse),
315 (InlayHints, InlayHintsResponse),
316 (ResolveInlayHint, ResolveInlayHintResponse),
317 (RefreshInlayHints, Ack),
318 (ReloadBuffers, ReloadBuffersResponse),
319 (RequestContact, Ack),
320 (RemoveChannelMember, Ack),
321 (RemoveContact, Ack),
322 (RespondToContactRequest, Ack),
323 (RespondToChannelInvite, Ack),
324 (SetChannelMemberAdmin, Ack),
325 (SendChannelMessage, SendChannelMessageResponse),
326 (GetChannelMessages, GetChannelMessagesResponse),
327 (GetChannelMembers, GetChannelMembersResponse),
328 (JoinChannel, JoinRoomResponse),
329 (RemoveChannel, Ack),
330 (RenameProjectEntry, ProjectEntryResponse),
331 (RenameChannel, ChannelResponse),
332 (SaveBuffer, BufferSaved),
333 (SearchProject, SearchProjectResponse),
334 (ShareProject, ShareProjectResponse),
335 (SynchronizeBuffers, SynchronizeBuffersResponse),
336 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
337 (Test, Test),
338 (UpdateBuffer, Ack),
339 (UpdateParticipantLocation, Ack),
340 (UpdateProject, Ack),
341 (UpdateWorktree, Ack),
342 (JoinChannelBuffer, JoinChannelBufferResponse),
343 (LeaveChannelBuffer, Ack)
344);
345
346entity_messages!(
347 project_id,
348 AddProjectCollaborator,
349 ApplyCodeAction,
350 ApplyCompletionAdditionalEdits,
351 BufferReloaded,
352 BufferSaved,
353 CopyProjectEntry,
354 CreateBufferForPeer,
355 CreateProjectEntry,
356 DeleteProjectEntry,
357 ExpandProjectEntry,
358 Follow,
359 FormatBuffers,
360 GetCodeActions,
361 GetCompletions,
362 GetDefinition,
363 GetTypeDefinition,
364 GetDocumentHighlights,
365 GetHover,
366 GetReferences,
367 GetProjectSymbols,
368 JoinProject,
369 LeaveProject,
370 OpenBufferById,
371 OpenBufferByPath,
372 OpenBufferForSymbol,
373 PerformRename,
374 OnTypeFormatting,
375 InlayHints,
376 ResolveInlayHint,
377 RefreshInlayHints,
378 PrepareRename,
379 ReloadBuffers,
380 RemoveProjectCollaborator,
381 RenameProjectEntry,
382 SaveBuffer,
383 SearchProject,
384 StartLanguageServer,
385 SynchronizeBuffers,
386 Unfollow,
387 UnshareProject,
388 UpdateBuffer,
389 UpdateBufferFile,
390 UpdateDiagnosticSummary,
391 UpdateFollowers,
392 UpdateLanguageServer,
393 UpdateProject,
394 UpdateProjectCollaborator,
395 UpdateWorktree,
396 UpdateWorktreeSettings,
397 UpdateDiffBase
398);
399
400entity_messages!(
401 channel_id,
402 ChannelMessageSent,
403 UpdateChannelBuffer,
404 RemoveChannelBufferCollaborator,
405 AddChannelBufferCollaborator,
406 UpdateChannelBufferCollaborator
407);
408
409const KIB: usize = 1024;
410const MIB: usize = KIB * 1024;
411const MAX_BUFFER_LEN: usize = MIB;
412
413/// A stream of protobuf messages.
414pub struct MessageStream<S> {
415 stream: S,
416 encoding_buffer: Vec<u8>,
417}
418
419#[allow(clippy::large_enum_variant)]
420#[derive(Debug)]
421pub enum Message {
422 Envelope(Envelope),
423 Ping,
424 Pong,
425}
426
427impl<S> MessageStream<S> {
428 pub fn new(stream: S) -> Self {
429 Self {
430 stream,
431 encoding_buffer: Vec::new(),
432 }
433 }
434
435 pub fn inner_mut(&mut self) -> &mut S {
436 &mut self.stream
437 }
438}
439
440impl<S> MessageStream<S>
441where
442 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
443{
444 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
445 #[cfg(any(test, feature = "test-support"))]
446 const COMPRESSION_LEVEL: i32 = -7;
447
448 #[cfg(not(any(test, feature = "test-support")))]
449 const COMPRESSION_LEVEL: i32 = 4;
450
451 match message {
452 Message::Envelope(message) => {
453 self.encoding_buffer.reserve(message.encoded_len());
454 message
455 .encode(&mut self.encoding_buffer)
456 .map_err(io::Error::from)?;
457 let buffer =
458 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
459 .unwrap();
460
461 self.encoding_buffer.clear();
462 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
463 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
464 }
465 Message::Ping => {
466 self.stream
467 .send(WebSocketMessage::Ping(Default::default()))
468 .await?;
469 }
470 Message::Pong => {
471 self.stream
472 .send(WebSocketMessage::Pong(Default::default()))
473 .await?;
474 }
475 }
476
477 Ok(())
478 }
479}
480
481impl<S> MessageStream<S>
482where
483 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
484{
485 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
486 while let Some(bytes) = self.stream.next().await {
487 match bytes? {
488 WebSocketMessage::Binary(bytes) => {
489 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
490 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
491 .map_err(io::Error::from)?;
492
493 self.encoding_buffer.clear();
494 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
495 return Ok(Message::Envelope(envelope));
496 }
497 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
498 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
499 WebSocketMessage::Close(_) => break,
500 _ => {}
501 }
502 }
503 Err(anyhow!("connection closed"))
504 }
505}
506
507impl From<Timestamp> for SystemTime {
508 fn from(val: Timestamp) -> Self {
509 UNIX_EPOCH
510 .checked_add(Duration::new(val.seconds, val.nanos))
511 .unwrap()
512 }
513}
514
515impl From<SystemTime> for Timestamp {
516 fn from(time: SystemTime) -> Self {
517 let duration = time.duration_since(UNIX_EPOCH).unwrap();
518 Self {
519 seconds: duration.as_secs(),
520 nanos: duration.subsec_nanos(),
521 }
522 }
523}
524
525impl From<u128> for Nonce {
526 fn from(nonce: u128) -> Self {
527 let upper_half = (nonce >> 64) as u64;
528 let lower_half = nonce as u64;
529 Self {
530 upper_half,
531 lower_half,
532 }
533 }
534}
535
536impl From<Nonce> for u128 {
537 fn from(nonce: Nonce) -> Self {
538 let upper_half = (nonce.upper_half as u128) << 64;
539 let lower_half = nonce.lower_half as u128;
540 upper_half | lower_half
541 }
542}
543
544pub fn split_worktree_update(
545 mut message: UpdateWorktree,
546 max_chunk_size: usize,
547) -> impl Iterator<Item = UpdateWorktree> {
548 let mut done_files = false;
549
550 let mut repository_map = message
551 .updated_repositories
552 .into_iter()
553 .map(|repo| (repo.work_directory_id, repo))
554 .collect::<HashMap<_, _>>();
555
556 iter::from_fn(move || {
557 if done_files {
558 return None;
559 }
560
561 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
562 let updated_entries: Vec<_> = message
563 .updated_entries
564 .drain(..updated_entries_chunk_size)
565 .collect();
566
567 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
568 let removed_entries = message
569 .removed_entries
570 .drain(..removed_entries_chunk_size)
571 .collect();
572
573 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
574
575 let mut updated_repositories = Vec::new();
576
577 if !repository_map.is_empty() {
578 for entry in &updated_entries {
579 if let Some(repo) = repository_map.remove(&entry.id) {
580 updated_repositories.push(repo)
581 }
582 }
583 }
584
585 let removed_repositories = if done_files {
586 mem::take(&mut message.removed_repositories)
587 } else {
588 Default::default()
589 };
590
591 if done_files {
592 updated_repositories.extend(mem::take(&mut repository_map).into_values());
593 }
594
595 Some(UpdateWorktree {
596 project_id: message.project_id,
597 worktree_id: message.worktree_id,
598 root_name: message.root_name.clone(),
599 abs_path: message.abs_path.clone(),
600 updated_entries,
601 removed_entries,
602 scan_id: message.scan_id,
603 is_last_update: done_files && message.is_last_update,
604 updated_repositories,
605 removed_repositories,
606 })
607 })
608}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613
614 #[gpui::test]
615 async fn test_buffer_size() {
616 let (tx, rx) = futures::channel::mpsc::unbounded();
617 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
618 sink.write(Message::Envelope(Envelope {
619 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
620 root_name: "abcdefg".repeat(10),
621 ..Default::default()
622 })),
623 ..Default::default()
624 }))
625 .await
626 .unwrap();
627 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
628 sink.write(Message::Envelope(Envelope {
629 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
630 root_name: "abcdefg".repeat(1000000),
631 ..Default::default()
632 })),
633 ..Default::default()
634 }))
635 .await
636 .unwrap();
637 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
638
639 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
640 stream.read().await.unwrap();
641 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
642 stream.read().await.unwrap();
643 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
644 }
645
646 #[gpui::test]
647 fn test_converting_peer_id_from_and_to_u64() {
648 let peer_id = PeerId {
649 owner_id: 10,
650 id: 3,
651 };
652 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
653 let peer_id = PeerId {
654 owner_id: u32::MAX,
655 id: 3,
656 };
657 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
658 let peer_id = PeerId {
659 owner_id: 10,
660 id: u32::MAX,
661 };
662 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
663 let peer_id = PeerId {
664 owner_id: u32::MAX,
665 id: u32::MAX,
666 };
667 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
668 }
669}