1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AddProjectCollaborator, Foreground),
137 (ApplyCodeAction, Background),
138 (ApplyCodeActionResponse, Background),
139 (ApplyCompletionAdditionalEdits, Background),
140 (ApplyCompletionAdditionalEditsResponse, Background),
141 (BufferReloaded, Foreground),
142 (BufferSaved, Foreground),
143 (Call, Foreground),
144 (CallCanceled, Foreground),
145 (CancelCall, Foreground),
146 (CopyProjectEntry, Foreground),
147 (CreateBufferForPeer, Foreground),
148 (CreateChannel, Foreground),
149 (ChannelResponse, Foreground),
150 (CreateProjectEntry, Foreground),
151 (CreateRoom, Foreground),
152 (CreateRoomResponse, Foreground),
153 (DeclineCall, Foreground),
154 (DeleteProjectEntry, Foreground),
155 (Error, Foreground),
156 (ExpandProjectEntry, Foreground),
157 (Follow, Foreground),
158 (FollowResponse, Foreground),
159 (FormatBuffers, Foreground),
160 (FormatBuffersResponse, Foreground),
161 (FuzzySearchUsers, Foreground),
162 (GetCodeActions, Background),
163 (GetCodeActionsResponse, Background),
164 (GetHover, Background),
165 (GetHoverResponse, Background),
166 (GetCompletions, Background),
167 (GetCompletionsResponse, Background),
168 (GetDefinition, Background),
169 (GetDefinitionResponse, Background),
170 (GetTypeDefinition, Background),
171 (GetTypeDefinitionResponse, Background),
172 (GetDocumentHighlights, Background),
173 (GetDocumentHighlightsResponse, Background),
174 (GetReferences, Background),
175 (GetReferencesResponse, Background),
176 (GetProjectSymbols, Background),
177 (GetProjectSymbolsResponse, Background),
178 (GetUsers, Foreground),
179 (Hello, Foreground),
180 (IncomingCall, Foreground),
181 (InviteChannelMember, Foreground),
182 (UsersResponse, Foreground),
183 (JoinProject, Foreground),
184 (JoinProjectResponse, Foreground),
185 (JoinRoom, Foreground),
186 (JoinRoomResponse, Foreground),
187 (LeaveProject, Foreground),
188 (LeaveRoom, Foreground),
189 (OpenBufferById, Background),
190 (OpenBufferByPath, Background),
191 (OpenBufferForSymbol, Background),
192 (OpenBufferForSymbolResponse, Background),
193 (OpenBufferResponse, Background),
194 (PerformRename, Background),
195 (PerformRenameResponse, Background),
196 (OnTypeFormatting, Background),
197 (OnTypeFormattingResponse, Background),
198 (InlayHints, Background),
199 (InlayHintsResponse, Background),
200 (RefreshInlayHints, Foreground),
201 (Ping, Foreground),
202 (PrepareRename, Background),
203 (PrepareRenameResponse, Background),
204 (ExpandProjectEntryResponse, Foreground),
205 (ProjectEntryResponse, Foreground),
206 (RejoinRoom, Foreground),
207 (RejoinRoomResponse, Foreground),
208 (RemoveContact, Foreground),
209 (RemoveChannelMember, Foreground),
210 (ReloadBuffers, Foreground),
211 (ReloadBuffersResponse, Foreground),
212 (RemoveProjectCollaborator, Foreground),
213 (RenameProjectEntry, Foreground),
214 (RequestContact, Foreground),
215 (RespondToContactRequest, Foreground),
216 (RespondToChannelInvite, Foreground),
217 (JoinChannel, Foreground),
218 (RoomUpdated, Foreground),
219 (SaveBuffer, Foreground),
220 (RenameChannel, Foreground),
221 (SetChannelMemberAdmin, Foreground),
222 (SearchProject, Background),
223 (SearchProjectResponse, Background),
224 (ShareProject, Foreground),
225 (ShareProjectResponse, Foreground),
226 (ShowContacts, Foreground),
227 (StartLanguageServer, Foreground),
228 (SynchronizeBuffers, Foreground),
229 (SynchronizeBuffersResponse, Foreground),
230 (Test, Foreground),
231 (Unfollow, Foreground),
232 (UnshareProject, Foreground),
233 (UpdateBuffer, Foreground),
234 (UpdateBufferFile, Foreground),
235 (UpdateContacts, Foreground),
236 (RemoveChannel, Foreground),
237 (UpdateChannels, Foreground),
238 (UpdateDiagnosticSummary, Foreground),
239 (UpdateFollowers, Foreground),
240 (UpdateInviteInfo, Foreground),
241 (UpdateLanguageServer, Foreground),
242 (UpdateParticipantLocation, Foreground),
243 (UpdateProject, Foreground),
244 (UpdateProjectCollaborator, Foreground),
245 (UpdateWorktree, Foreground),
246 (UpdateWorktreeSettings, Foreground),
247 (UpdateDiffBase, Foreground),
248 (GetPrivateUserInfo, Foreground),
249 (GetPrivateUserInfoResponse, Foreground),
250 (GetChannelMembers, Foreground),
251 (GetChannelMembersResponse, Foreground),
252 (JoinChannelBuffer, Foreground),
253 (JoinChannelBufferResponse, Foreground),
254 (LeaveChannelBuffer, Background),
255 (UpdateChannelBuffer, Foreground),
256 (RemoveChannelBufferCollaborator, Foreground),
257 (AddChannelBufferCollaborator, Foreground),
258);
259
260request_messages!(
261 (ApplyCodeAction, ApplyCodeActionResponse),
262 (
263 ApplyCompletionAdditionalEdits,
264 ApplyCompletionAdditionalEditsResponse
265 ),
266 (Call, Ack),
267 (CancelCall, Ack),
268 (CopyProjectEntry, ProjectEntryResponse),
269 (CreateProjectEntry, ProjectEntryResponse),
270 (CreateRoom, CreateRoomResponse),
271 (CreateChannel, ChannelResponse),
272 (DeclineCall, Ack),
273 (DeleteProjectEntry, ProjectEntryResponse),
274 (ExpandProjectEntry, ExpandProjectEntryResponse),
275 (Follow, FollowResponse),
276 (FormatBuffers, FormatBuffersResponse),
277 (GetCodeActions, GetCodeActionsResponse),
278 (GetHover, GetHoverResponse),
279 (GetCompletions, GetCompletionsResponse),
280 (GetDefinition, GetDefinitionResponse),
281 (GetTypeDefinition, GetTypeDefinitionResponse),
282 (GetDocumentHighlights, GetDocumentHighlightsResponse),
283 (GetReferences, GetReferencesResponse),
284 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
285 (GetProjectSymbols, GetProjectSymbolsResponse),
286 (FuzzySearchUsers, UsersResponse),
287 (GetUsers, UsersResponse),
288 (InviteChannelMember, Ack),
289 (JoinProject, JoinProjectResponse),
290 (JoinRoom, JoinRoomResponse),
291 (LeaveRoom, Ack),
292 (RejoinRoom, RejoinRoomResponse),
293 (IncomingCall, Ack),
294 (OpenBufferById, OpenBufferResponse),
295 (OpenBufferByPath, OpenBufferResponse),
296 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
297 (Ping, Ack),
298 (PerformRename, PerformRenameResponse),
299 (PrepareRename, PrepareRenameResponse),
300 (OnTypeFormatting, OnTypeFormattingResponse),
301 (InlayHints, InlayHintsResponse),
302 (RefreshInlayHints, Ack),
303 (ReloadBuffers, ReloadBuffersResponse),
304 (RequestContact, Ack),
305 (RemoveChannelMember, Ack),
306 (RemoveContact, Ack),
307 (RespondToContactRequest, Ack),
308 (RespondToChannelInvite, Ack),
309 (SetChannelMemberAdmin, Ack),
310 (GetChannelMembers, GetChannelMembersResponse),
311 (JoinChannel, JoinRoomResponse),
312 (RemoveChannel, Ack),
313 (RenameProjectEntry, ProjectEntryResponse),
314 (RenameChannel, ChannelResponse),
315 (SaveBuffer, BufferSaved),
316 (SearchProject, SearchProjectResponse),
317 (ShareProject, ShareProjectResponse),
318 (SynchronizeBuffers, SynchronizeBuffersResponse),
319 (Test, Test),
320 (UpdateBuffer, Ack),
321 (UpdateParticipantLocation, Ack),
322 (UpdateProject, Ack),
323 (UpdateWorktree, Ack),
324 (JoinChannelBuffer, JoinChannelBufferResponse),
325 (LeaveChannelBuffer, Ack)
326);
327
328entity_messages!(
329 project_id,
330 AddProjectCollaborator,
331 ApplyCodeAction,
332 ApplyCompletionAdditionalEdits,
333 BufferReloaded,
334 BufferSaved,
335 CopyProjectEntry,
336 CreateBufferForPeer,
337 CreateProjectEntry,
338 DeleteProjectEntry,
339 ExpandProjectEntry,
340 Follow,
341 FormatBuffers,
342 GetCodeActions,
343 GetCompletions,
344 GetDefinition,
345 GetTypeDefinition,
346 GetDocumentHighlights,
347 GetHover,
348 GetReferences,
349 GetProjectSymbols,
350 JoinProject,
351 LeaveProject,
352 OpenBufferById,
353 OpenBufferByPath,
354 OpenBufferForSymbol,
355 PerformRename,
356 OnTypeFormatting,
357 InlayHints,
358 RefreshInlayHints,
359 PrepareRename,
360 ReloadBuffers,
361 RemoveProjectCollaborator,
362 RenameProjectEntry,
363 SaveBuffer,
364 SearchProject,
365 StartLanguageServer,
366 SynchronizeBuffers,
367 Unfollow,
368 UnshareProject,
369 UpdateBuffer,
370 UpdateBufferFile,
371 UpdateDiagnosticSummary,
372 UpdateFollowers,
373 UpdateLanguageServer,
374 UpdateProject,
375 UpdateProjectCollaborator,
376 UpdateWorktree,
377 UpdateWorktreeSettings,
378 UpdateDiffBase
379);
380
381entity_messages!(
382 channel_id,
383 UpdateChannelBuffer,
384 RemoveChannelBufferCollaborator,
385 AddChannelBufferCollaborator
386);
387
388const KIB: usize = 1024;
389const MIB: usize = KIB * 1024;
390const MAX_BUFFER_LEN: usize = MIB;
391
392/// A stream of protobuf messages.
393pub struct MessageStream<S> {
394 stream: S,
395 encoding_buffer: Vec<u8>,
396}
397
398#[allow(clippy::large_enum_variant)]
399#[derive(Debug)]
400pub enum Message {
401 Envelope(Envelope),
402 Ping,
403 Pong,
404}
405
406impl<S> MessageStream<S> {
407 pub fn new(stream: S) -> Self {
408 Self {
409 stream,
410 encoding_buffer: Vec::new(),
411 }
412 }
413
414 pub fn inner_mut(&mut self) -> &mut S {
415 &mut self.stream
416 }
417}
418
419impl<S> MessageStream<S>
420where
421 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
422{
423 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
424 #[cfg(any(test, feature = "test-support"))]
425 const COMPRESSION_LEVEL: i32 = -7;
426
427 #[cfg(not(any(test, feature = "test-support")))]
428 const COMPRESSION_LEVEL: i32 = 4;
429
430 match message {
431 Message::Envelope(message) => {
432 self.encoding_buffer.reserve(message.encoded_len());
433 message
434 .encode(&mut self.encoding_buffer)
435 .map_err(io::Error::from)?;
436 let buffer =
437 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
438 .unwrap();
439
440 self.encoding_buffer.clear();
441 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
442 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
443 }
444 Message::Ping => {
445 self.stream
446 .send(WebSocketMessage::Ping(Default::default()))
447 .await?;
448 }
449 Message::Pong => {
450 self.stream
451 .send(WebSocketMessage::Pong(Default::default()))
452 .await?;
453 }
454 }
455
456 Ok(())
457 }
458}
459
460impl<S> MessageStream<S>
461where
462 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
463{
464 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
465 while let Some(bytes) = self.stream.next().await {
466 match bytes? {
467 WebSocketMessage::Binary(bytes) => {
468 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
469 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
470 .map_err(io::Error::from)?;
471
472 self.encoding_buffer.clear();
473 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
474 return Ok(Message::Envelope(envelope));
475 }
476 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
477 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
478 WebSocketMessage::Close(_) => break,
479 _ => {}
480 }
481 }
482 Err(anyhow!("connection closed"))
483 }
484}
485
486impl From<Timestamp> for SystemTime {
487 fn from(val: Timestamp) -> Self {
488 UNIX_EPOCH
489 .checked_add(Duration::new(val.seconds, val.nanos))
490 .unwrap()
491 }
492}
493
494impl From<SystemTime> for Timestamp {
495 fn from(time: SystemTime) -> Self {
496 let duration = time.duration_since(UNIX_EPOCH).unwrap();
497 Self {
498 seconds: duration.as_secs(),
499 nanos: duration.subsec_nanos(),
500 }
501 }
502}
503
504impl From<u128> for Nonce {
505 fn from(nonce: u128) -> Self {
506 let upper_half = (nonce >> 64) as u64;
507 let lower_half = nonce as u64;
508 Self {
509 upper_half,
510 lower_half,
511 }
512 }
513}
514
515impl From<Nonce> for u128 {
516 fn from(nonce: Nonce) -> Self {
517 let upper_half = (nonce.upper_half as u128) << 64;
518 let lower_half = nonce.lower_half as u128;
519 upper_half | lower_half
520 }
521}
522
523pub fn split_worktree_update(
524 mut message: UpdateWorktree,
525 max_chunk_size: usize,
526) -> impl Iterator<Item = UpdateWorktree> {
527 let mut done_files = false;
528
529 let mut repository_map = message
530 .updated_repositories
531 .into_iter()
532 .map(|repo| (repo.work_directory_id, repo))
533 .collect::<HashMap<_, _>>();
534
535 iter::from_fn(move || {
536 if done_files {
537 return None;
538 }
539
540 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
541 let updated_entries: Vec<_> = message
542 .updated_entries
543 .drain(..updated_entries_chunk_size)
544 .collect();
545
546 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
547 let removed_entries = message
548 .removed_entries
549 .drain(..removed_entries_chunk_size)
550 .collect();
551
552 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
553
554 let mut updated_repositories = Vec::new();
555
556 if !repository_map.is_empty() {
557 for entry in &updated_entries {
558 if let Some(repo) = repository_map.remove(&entry.id) {
559 updated_repositories.push(repo)
560 }
561 }
562 }
563
564 let removed_repositories = if done_files {
565 mem::take(&mut message.removed_repositories)
566 } else {
567 Default::default()
568 };
569
570 if done_files {
571 updated_repositories.extend(mem::take(&mut repository_map).into_values());
572 }
573
574 Some(UpdateWorktree {
575 project_id: message.project_id,
576 worktree_id: message.worktree_id,
577 root_name: message.root_name.clone(),
578 abs_path: message.abs_path.clone(),
579 updated_entries,
580 removed_entries,
581 scan_id: message.scan_id,
582 is_last_update: done_files && message.is_last_update,
583 updated_repositories,
584 removed_repositories,
585 })
586 })
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[gpui::test]
594 async fn test_buffer_size() {
595 let (tx, rx) = futures::channel::mpsc::unbounded();
596 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
597 sink.write(Message::Envelope(Envelope {
598 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
599 root_name: "abcdefg".repeat(10),
600 ..Default::default()
601 })),
602 ..Default::default()
603 }))
604 .await
605 .unwrap();
606 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
607 sink.write(Message::Envelope(Envelope {
608 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
609 root_name: "abcdefg".repeat(1000000),
610 ..Default::default()
611 })),
612 ..Default::default()
613 }))
614 .await
615 .unwrap();
616 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
617
618 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
619 stream.read().await.unwrap();
620 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
621 stream.read().await.unwrap();
622 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
623 }
624
625 #[gpui::test]
626 fn test_converting_peer_id_from_and_to_u64() {
627 let peer_id = PeerId {
628 owner_id: 10,
629 id: 3,
630 };
631 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
632 let peer_id = PeerId {
633 owner_id: u32::MAX,
634 id: 3,
635 };
636 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
637 let peer_id = PeerId {
638 owner_id: 10,
639 id: u32::MAX,
640 };
641 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
642 let peer_id = PeerId {
643 owner_id: u32::MAX,
644 id: u32::MAX,
645 };
646 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
647 }
648}