1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AddProjectCollaborator, Foreground),
137 (ApplyCodeAction, Background),
138 (ApplyCodeActionResponse, Background),
139 (ApplyCompletionAdditionalEdits, Background),
140 (ApplyCompletionAdditionalEditsResponse, Background),
141 (BufferReloaded, Foreground),
142 (BufferSaved, Foreground),
143 (Call, Foreground),
144 (CallCanceled, Foreground),
145 (CancelCall, Foreground),
146 (CopyProjectEntry, Foreground),
147 (CreateBufferForPeer, Foreground),
148 (CreateChannel, Foreground),
149 (ChannelResponse, Foreground),
150 (ChannelMessageSent, Foreground),
151 (CreateProjectEntry, Foreground),
152 (CreateRoom, Foreground),
153 (CreateRoomResponse, Foreground),
154 (DeclineCall, Foreground),
155 (DeleteProjectEntry, Foreground),
156 (Error, Foreground),
157 (ExpandProjectEntry, Foreground),
158 (Follow, Foreground),
159 (FollowResponse, Foreground),
160 (FormatBuffers, Foreground),
161 (FormatBuffersResponse, Foreground),
162 (FuzzySearchUsers, Foreground),
163 (GetCodeActions, Background),
164 (GetCodeActionsResponse, Background),
165 (GetHover, Background),
166 (GetHoverResponse, Background),
167 (GetChannelMessages, Background),
168 (GetChannelMessagesResponse, Background),
169 (SendChannelMessage, Background),
170 (SendChannelMessageResponse, Background),
171 (GetCompletions, Background),
172 (GetCompletionsResponse, Background),
173 (GetDefinition, Background),
174 (GetDefinitionResponse, Background),
175 (GetTypeDefinition, Background),
176 (GetTypeDefinitionResponse, Background),
177 (GetDocumentHighlights, Background),
178 (GetDocumentHighlightsResponse, Background),
179 (GetReferences, Background),
180 (GetReferencesResponse, Background),
181 (GetProjectSymbols, Background),
182 (GetProjectSymbolsResponse, Background),
183 (GetUsers, Foreground),
184 (Hello, Foreground),
185 (IncomingCall, Foreground),
186 (InviteChannelMember, Foreground),
187 (UsersResponse, Foreground),
188 (JoinProject, Foreground),
189 (JoinProjectResponse, Foreground),
190 (JoinRoom, Foreground),
191 (JoinRoomResponse, Foreground),
192 (JoinChannelChat, Foreground),
193 (JoinChannelChatResponse, Foreground),
194 (LeaveChannelChat, Foreground),
195 (LeaveProject, Foreground),
196 (LeaveRoom, Foreground),
197 (OpenBufferById, Background),
198 (OpenBufferByPath, Background),
199 (OpenBufferForSymbol, Background),
200 (OpenBufferForSymbolResponse, Background),
201 (OpenBufferResponse, Background),
202 (PerformRename, Background),
203 (PerformRenameResponse, Background),
204 (OnTypeFormatting, Background),
205 (OnTypeFormattingResponse, Background),
206 (InlayHints, Background),
207 (InlayHintsResponse, Background),
208 (ResolveInlayHint, Background),
209 (ResolveInlayHintResponse, Background),
210 (RefreshInlayHints, Foreground),
211 (Ping, Foreground),
212 (PrepareRename, Background),
213 (PrepareRenameResponse, Background),
214 (ExpandProjectEntryResponse, Foreground),
215 (ProjectEntryResponse, Foreground),
216 (RejoinRoom, Foreground),
217 (RejoinRoomResponse, Foreground),
218 (RemoveContact, Foreground),
219 (RemoveChannelMember, Foreground),
220 (RemoveChannelMessage, Foreground),
221 (ReloadBuffers, Foreground),
222 (ReloadBuffersResponse, Foreground),
223 (RemoveProjectCollaborator, Foreground),
224 (RenameProjectEntry, Foreground),
225 (RequestContact, Foreground),
226 (RespondToContactRequest, Foreground),
227 (RespondToChannelInvite, Foreground),
228 (JoinChannel, Foreground),
229 (RoomUpdated, Foreground),
230 (SaveBuffer, Foreground),
231 (RenameChannel, Foreground),
232 (SetChannelMemberAdmin, Foreground),
233 (SearchProject, Background),
234 (SearchProjectResponse, Background),
235 (ShareProject, Foreground),
236 (ShareProjectResponse, Foreground),
237 (ShowContacts, Foreground),
238 (StartLanguageServer, Foreground),
239 (SynchronizeBuffers, Foreground),
240 (SynchronizeBuffersResponse, Foreground),
241 (RejoinChannelBuffers, Foreground),
242 (RejoinChannelBuffersResponse, Foreground),
243 (Test, Foreground),
244 (Unfollow, Foreground),
245 (UnshareProject, Foreground),
246 (UpdateBuffer, Foreground),
247 (UpdateBufferFile, Foreground),
248 (UpdateContacts, Foreground),
249 (DeleteChannel, Foreground),
250 (MoveChannel, Foreground),
251 (UpdateChannels, Foreground),
252 (UpdateDiagnosticSummary, Foreground),
253 (UpdateFollowers, Foreground),
254 (UpdateInviteInfo, Foreground),
255 (UpdateLanguageServer, Foreground),
256 (UpdateParticipantLocation, Foreground),
257 (UpdateProject, Foreground),
258 (UpdateProjectCollaborator, Foreground),
259 (UpdateWorktree, Foreground),
260 (UpdateWorktreeSettings, Foreground),
261 (UpdateDiffBase, Foreground),
262 (GetPrivateUserInfo, Foreground),
263 (GetPrivateUserInfoResponse, Foreground),
264 (GetChannelMembers, Foreground),
265 (GetChannelMembersResponse, Foreground),
266 (JoinChannelBuffer, Foreground),
267 (JoinChannelBufferResponse, Foreground),
268 (LeaveChannelBuffer, Background),
269 (UpdateChannelBuffer, Foreground),
270 (RemoveChannelBufferCollaborator, Foreground),
271 (AddChannelBufferCollaborator, Foreground),
272 (UpdateChannelBufferCollaborator, Foreground),
273);
274
275request_messages!(
276 (ApplyCodeAction, ApplyCodeActionResponse),
277 (
278 ApplyCompletionAdditionalEdits,
279 ApplyCompletionAdditionalEditsResponse
280 ),
281 (Call, Ack),
282 (CancelCall, Ack),
283 (CopyProjectEntry, ProjectEntryResponse),
284 (CreateProjectEntry, ProjectEntryResponse),
285 (CreateRoom, CreateRoomResponse),
286 (CreateChannel, ChannelResponse),
287 (DeclineCall, Ack),
288 (DeleteProjectEntry, ProjectEntryResponse),
289 (ExpandProjectEntry, ExpandProjectEntryResponse),
290 (Follow, FollowResponse),
291 (FormatBuffers, FormatBuffersResponse),
292 (GetCodeActions, GetCodeActionsResponse),
293 (GetHover, GetHoverResponse),
294 (GetCompletions, GetCompletionsResponse),
295 (GetDefinition, GetDefinitionResponse),
296 (GetTypeDefinition, GetTypeDefinitionResponse),
297 (GetDocumentHighlights, GetDocumentHighlightsResponse),
298 (GetReferences, GetReferencesResponse),
299 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
300 (GetProjectSymbols, GetProjectSymbolsResponse),
301 (FuzzySearchUsers, UsersResponse),
302 (GetUsers, UsersResponse),
303 (InviteChannelMember, Ack),
304 (JoinProject, JoinProjectResponse),
305 (JoinRoom, JoinRoomResponse),
306 (JoinChannelChat, JoinChannelChatResponse),
307 (LeaveRoom, Ack),
308 (RejoinRoom, RejoinRoomResponse),
309 (IncomingCall, Ack),
310 (OpenBufferById, OpenBufferResponse),
311 (OpenBufferByPath, OpenBufferResponse),
312 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
313 (Ping, Ack),
314 (PerformRename, PerformRenameResponse),
315 (PrepareRename, PrepareRenameResponse),
316 (OnTypeFormatting, OnTypeFormattingResponse),
317 (InlayHints, InlayHintsResponse),
318 (ResolveInlayHint, ResolveInlayHintResponse),
319 (RefreshInlayHints, Ack),
320 (ReloadBuffers, ReloadBuffersResponse),
321 (RequestContact, Ack),
322 (RemoveChannelMember, Ack),
323 (RemoveContact, Ack),
324 (RespondToContactRequest, Ack),
325 (RespondToChannelInvite, Ack),
326 (SetChannelMemberAdmin, Ack),
327 (SendChannelMessage, SendChannelMessageResponse),
328 (GetChannelMessages, GetChannelMessagesResponse),
329 (GetChannelMembers, GetChannelMembersResponse),
330 (JoinChannel, JoinRoomResponse),
331 (RemoveChannel, Ack),
332 (RemoveChannelMessage, Ack),
333 (DeleteChannel, Ack),
334 (RenameProjectEntry, ProjectEntryResponse),
335 (RenameChannel, ChannelResponse),
336 (MoveChannel, Ack),
337 (SaveBuffer, BufferSaved),
338 (SearchProject, SearchProjectResponse),
339 (ShareProject, ShareProjectResponse),
340 (SynchronizeBuffers, SynchronizeBuffersResponse),
341 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
342 (Test, Test),
343 (UpdateBuffer, Ack),
344 (UpdateParticipantLocation, Ack),
345 (UpdateProject, Ack),
346 (UpdateWorktree, Ack),
347 (JoinChannelBuffer, JoinChannelBufferResponse),
348 (LeaveChannelBuffer, Ack)
349);
350
351entity_messages!(
352 project_id,
353 AddProjectCollaborator,
354 ApplyCodeAction,
355 ApplyCompletionAdditionalEdits,
356 BufferReloaded,
357 BufferSaved,
358 CopyProjectEntry,
359 CreateBufferForPeer,
360 CreateProjectEntry,
361 DeleteProjectEntry,
362 ExpandProjectEntry,
363 Follow,
364 FormatBuffers,
365 GetCodeActions,
366 GetCompletions,
367 GetDefinition,
368 GetTypeDefinition,
369 GetDocumentHighlights,
370 GetHover,
371 GetReferences,
372 GetProjectSymbols,
373 JoinProject,
374 LeaveProject,
375 OpenBufferById,
376 OpenBufferByPath,
377 OpenBufferForSymbol,
378 PerformRename,
379 OnTypeFormatting,
380 InlayHints,
381 ResolveInlayHint,
382 RefreshInlayHints,
383 PrepareRename,
384 ReloadBuffers,
385 RemoveProjectCollaborator,
386 RenameProjectEntry,
387 SaveBuffer,
388 SearchProject,
389 StartLanguageServer,
390 SynchronizeBuffers,
391 Unfollow,
392 UnshareProject,
393 UpdateBuffer,
394 UpdateBufferFile,
395 UpdateDiagnosticSummary,
396 UpdateFollowers,
397 UpdateLanguageServer,
398 UpdateProject,
399 UpdateProjectCollaborator,
400 UpdateWorktree,
401 UpdateWorktreeSettings,
402 UpdateDiffBase
403);
404
405entity_messages!(
406 channel_id,
407 ChannelMessageSent,
408 UpdateChannelBuffer,
409 RemoveChannelBufferCollaborator,
410 RemoveChannelMessage,
411 AddChannelBufferCollaborator,
412 UpdateChannelBufferCollaborator
413);
414
415const KIB: usize = 1024;
416const MIB: usize = KIB * 1024;
417const MAX_BUFFER_LEN: usize = MIB;
418
419/// A stream of protobuf messages.
420pub struct MessageStream<S> {
421 stream: S,
422 encoding_buffer: Vec<u8>,
423}
424
425#[allow(clippy::large_enum_variant)]
426#[derive(Debug)]
427pub enum Message {
428 Envelope(Envelope),
429 Ping,
430 Pong,
431}
432
433impl<S> MessageStream<S> {
434 pub fn new(stream: S) -> Self {
435 Self {
436 stream,
437 encoding_buffer: Vec::new(),
438 }
439 }
440
441 pub fn inner_mut(&mut self) -> &mut S {
442 &mut self.stream
443 }
444}
445
446impl<S> MessageStream<S>
447where
448 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
449{
450 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
451 #[cfg(any(test, feature = "test-support"))]
452 const COMPRESSION_LEVEL: i32 = -7;
453
454 #[cfg(not(any(test, feature = "test-support")))]
455 const COMPRESSION_LEVEL: i32 = 4;
456
457 match message {
458 Message::Envelope(message) => {
459 self.encoding_buffer.reserve(message.encoded_len());
460 message
461 .encode(&mut self.encoding_buffer)
462 .map_err(io::Error::from)?;
463 let buffer =
464 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
465 .unwrap();
466
467 self.encoding_buffer.clear();
468 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
469 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
470 }
471 Message::Ping => {
472 self.stream
473 .send(WebSocketMessage::Ping(Default::default()))
474 .await?;
475 }
476 Message::Pong => {
477 self.stream
478 .send(WebSocketMessage::Pong(Default::default()))
479 .await?;
480 }
481 }
482
483 Ok(())
484 }
485}
486
487impl<S> MessageStream<S>
488where
489 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
490{
491 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
492 while let Some(bytes) = self.stream.next().await {
493 match bytes? {
494 WebSocketMessage::Binary(bytes) => {
495 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
496 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
497 .map_err(io::Error::from)?;
498
499 self.encoding_buffer.clear();
500 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
501 return Ok(Message::Envelope(envelope));
502 }
503 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
504 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
505 WebSocketMessage::Close(_) => break,
506 _ => {}
507 }
508 }
509 Err(anyhow!("connection closed"))
510 }
511}
512
513impl From<Timestamp> for SystemTime {
514 fn from(val: Timestamp) -> Self {
515 UNIX_EPOCH
516 .checked_add(Duration::new(val.seconds, val.nanos))
517 .unwrap()
518 }
519}
520
521impl From<SystemTime> for Timestamp {
522 fn from(time: SystemTime) -> Self {
523 let duration = time.duration_since(UNIX_EPOCH).unwrap();
524 Self {
525 seconds: duration.as_secs(),
526 nanos: duration.subsec_nanos(),
527 }
528 }
529}
530
531impl From<u128> for Nonce {
532 fn from(nonce: u128) -> Self {
533 let upper_half = (nonce >> 64) as u64;
534 let lower_half = nonce as u64;
535 Self {
536 upper_half,
537 lower_half,
538 }
539 }
540}
541
542impl From<Nonce> for u128 {
543 fn from(nonce: Nonce) -> Self {
544 let upper_half = (nonce.upper_half as u128) << 64;
545 let lower_half = nonce.lower_half as u128;
546 upper_half | lower_half
547 }
548}
549
550pub fn split_worktree_update(
551 mut message: UpdateWorktree,
552 max_chunk_size: usize,
553) -> impl Iterator<Item = UpdateWorktree> {
554 let mut done_files = false;
555
556 let mut repository_map = message
557 .updated_repositories
558 .into_iter()
559 .map(|repo| (repo.work_directory_id, repo))
560 .collect::<HashMap<_, _>>();
561
562 iter::from_fn(move || {
563 if done_files {
564 return None;
565 }
566
567 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
568 let updated_entries: Vec<_> = message
569 .updated_entries
570 .drain(..updated_entries_chunk_size)
571 .collect();
572
573 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
574 let removed_entries = message
575 .removed_entries
576 .drain(..removed_entries_chunk_size)
577 .collect();
578
579 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
580
581 let mut updated_repositories = Vec::new();
582
583 if !repository_map.is_empty() {
584 for entry in &updated_entries {
585 if let Some(repo) = repository_map.remove(&entry.id) {
586 updated_repositories.push(repo)
587 }
588 }
589 }
590
591 let removed_repositories = if done_files {
592 mem::take(&mut message.removed_repositories)
593 } else {
594 Default::default()
595 };
596
597 if done_files {
598 updated_repositories.extend(mem::take(&mut repository_map).into_values());
599 }
600
601 Some(UpdateWorktree {
602 project_id: message.project_id,
603 worktree_id: message.worktree_id,
604 root_name: message.root_name.clone(),
605 abs_path: message.abs_path.clone(),
606 updated_entries,
607 removed_entries,
608 scan_id: message.scan_id,
609 is_last_update: done_files && message.is_last_update,
610 updated_repositories,
611 removed_repositories,
612 })
613 })
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619
620 #[gpui::test]
621 async fn test_buffer_size() {
622 let (tx, rx) = futures::channel::mpsc::unbounded();
623 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
624 sink.write(Message::Envelope(Envelope {
625 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
626 root_name: "abcdefg".repeat(10),
627 ..Default::default()
628 })),
629 ..Default::default()
630 }))
631 .await
632 .unwrap();
633 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
634 sink.write(Message::Envelope(Envelope {
635 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
636 root_name: "abcdefg".repeat(1000000),
637 ..Default::default()
638 })),
639 ..Default::default()
640 }))
641 .await
642 .unwrap();
643 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
644
645 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
646 stream.read().await.unwrap();
647 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
648 stream.read().await.unwrap();
649 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
650 }
651
652 #[gpui::test]
653 fn test_converting_peer_id_from_and_to_u64() {
654 let peer_id = PeerId {
655 owner_id: 10,
656 id: 3,
657 };
658 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
659 let peer_id = PeerId {
660 owner_id: u32::MAX,
661 id: 3,
662 };
663 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
664 let peer_id = PeerId {
665 owner_id: 10,
666 id: u32::MAX,
667 };
668 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
669 let peer_id = PeerId {
670 owner_id: u32::MAX,
671 id: u32::MAX,
672 };
673 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
674 }
675}