1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AddProjectCollaborator, Foreground),
137 (ApplyCodeAction, Background),
138 (ApplyCodeActionResponse, Background),
139 (ApplyCompletionAdditionalEdits, Background),
140 (ApplyCompletionAdditionalEditsResponse, Background),
141 (BufferReloaded, Foreground),
142 (BufferSaved, Foreground),
143 (Call, Foreground),
144 (CallCanceled, Foreground),
145 (CancelCall, Foreground),
146 (CopyProjectEntry, Foreground),
147 (CreateBufferForPeer, Foreground),
148 (CreateChannel, Foreground),
149 (CreateChannelResponse, Foreground),
150 (CreateProjectEntry, Foreground),
151 (CreateRoom, Foreground),
152 (CreateRoomResponse, Foreground),
153 (DeclineCall, Foreground),
154 (DeleteProjectEntry, Foreground),
155 (Error, Foreground),
156 (ExpandProjectEntry, Foreground),
157 (Follow, Foreground),
158 (FollowResponse, Foreground),
159 (FormatBuffers, Foreground),
160 (FormatBuffersResponse, Foreground),
161 (FuzzySearchUsers, Foreground),
162 (GetCodeActions, Background),
163 (GetCodeActionsResponse, Background),
164 (GetHover, Background),
165 (GetHoverResponse, Background),
166 (GetCompletions, Background),
167 (GetCompletionsResponse, Background),
168 (GetDefinition, Background),
169 (GetDefinitionResponse, Background),
170 (GetTypeDefinition, Background),
171 (GetTypeDefinitionResponse, Background),
172 (GetDocumentHighlights, Background),
173 (GetDocumentHighlightsResponse, Background),
174 (GetReferences, Background),
175 (GetReferencesResponse, Background),
176 (GetProjectSymbols, Background),
177 (GetProjectSymbolsResponse, Background),
178 (GetUsers, Foreground),
179 (Hello, Foreground),
180 (IncomingCall, Foreground),
181 (InviteChannelMember, Foreground),
182 (UsersResponse, Foreground),
183 (JoinProject, Foreground),
184 (JoinProjectResponse, Foreground),
185 (JoinRoom, Foreground),
186 (JoinRoomResponse, Foreground),
187 (LeaveProject, Foreground),
188 (LeaveRoom, Foreground),
189 (OpenBufferById, Background),
190 (OpenBufferByPath, Background),
191 (OpenBufferForSymbol, Background),
192 (OpenBufferForSymbolResponse, Background),
193 (OpenBufferResponse, Background),
194 (PerformRename, Background),
195 (PerformRenameResponse, Background),
196 (OnTypeFormatting, Background),
197 (OnTypeFormattingResponse, Background),
198 (InlayHints, Background),
199 (InlayHintsResponse, Background),
200 (RefreshInlayHints, Foreground),
201 (Ping, Foreground),
202 (PrepareRename, Background),
203 (PrepareRenameResponse, Background),
204 (ExpandProjectEntryResponse, Foreground),
205 (ProjectEntryResponse, Foreground),
206 (RejoinRoom, Foreground),
207 (RejoinRoomResponse, Foreground),
208 (RemoveContact, Foreground),
209 (RemoveChannelMember, Foreground),
210 (ReloadBuffers, Foreground),
211 (ReloadBuffersResponse, Foreground),
212 (RemoveProjectCollaborator, Foreground),
213 (RenameProjectEntry, Foreground),
214 (RequestContact, Foreground),
215 (RespondToContactRequest, Foreground),
216 (RespondToChannelInvite, Foreground),
217 (JoinChannel, Foreground),
218 (RoomUpdated, Foreground),
219 (SaveBuffer, Foreground),
220 (SearchProject, Background),
221 (SearchProjectResponse, Background),
222 (ShareProject, Foreground),
223 (ShareProjectResponse, Foreground),
224 (ShowContacts, Foreground),
225 (StartLanguageServer, Foreground),
226 (SynchronizeBuffers, Foreground),
227 (SynchronizeBuffersResponse, Foreground),
228 (Test, Foreground),
229 (Unfollow, Foreground),
230 (UnshareProject, Foreground),
231 (UpdateBuffer, Foreground),
232 (UpdateBufferFile, Foreground),
233 (UpdateContacts, Foreground),
234 (RemoveChannel, Foreground),
235 (UpdateChannels, Foreground),
236 (UpdateDiagnosticSummary, Foreground),
237 (UpdateFollowers, Foreground),
238 (UpdateInviteInfo, Foreground),
239 (UpdateLanguageServer, Foreground),
240 (UpdateParticipantLocation, Foreground),
241 (UpdateProject, Foreground),
242 (UpdateProjectCollaborator, Foreground),
243 (UpdateWorktree, Foreground),
244 (UpdateWorktreeSettings, Foreground),
245 (UpdateDiffBase, Foreground),
246 (GetPrivateUserInfo, Foreground),
247 (GetPrivateUserInfoResponse, Foreground),
248);
249
250request_messages!(
251 (ApplyCodeAction, ApplyCodeActionResponse),
252 (
253 ApplyCompletionAdditionalEdits,
254 ApplyCompletionAdditionalEditsResponse
255 ),
256 (Call, Ack),
257 (CancelCall, Ack),
258 (CopyProjectEntry, ProjectEntryResponse),
259 (CreateProjectEntry, ProjectEntryResponse),
260 (CreateRoom, CreateRoomResponse),
261 (CreateChannel, CreateChannelResponse),
262 (DeclineCall, Ack),
263 (DeleteProjectEntry, ProjectEntryResponse),
264 (ExpandProjectEntry, ExpandProjectEntryResponse),
265 (Follow, FollowResponse),
266 (FormatBuffers, FormatBuffersResponse),
267 (GetCodeActions, GetCodeActionsResponse),
268 (GetHover, GetHoverResponse),
269 (GetCompletions, GetCompletionsResponse),
270 (GetDefinition, GetDefinitionResponse),
271 (GetTypeDefinition, GetTypeDefinitionResponse),
272 (GetDocumentHighlights, GetDocumentHighlightsResponse),
273 (GetReferences, GetReferencesResponse),
274 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
275 (GetProjectSymbols, GetProjectSymbolsResponse),
276 (FuzzySearchUsers, UsersResponse),
277 (GetUsers, UsersResponse),
278 (InviteChannelMember, Ack),
279 (JoinProject, JoinProjectResponse),
280 (JoinRoom, JoinRoomResponse),
281 (LeaveRoom, Ack),
282 (RejoinRoom, RejoinRoomResponse),
283 (IncomingCall, Ack),
284 (OpenBufferById, OpenBufferResponse),
285 (OpenBufferByPath, OpenBufferResponse),
286 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
287 (Ping, Ack),
288 (PerformRename, PerformRenameResponse),
289 (PrepareRename, PrepareRenameResponse),
290 (OnTypeFormatting, OnTypeFormattingResponse),
291 (InlayHints, InlayHintsResponse),
292 (RefreshInlayHints, Ack),
293 (ReloadBuffers, ReloadBuffersResponse),
294 (RequestContact, Ack),
295 (RemoveChannelMember, Ack),
296 (RemoveContact, Ack),
297 (RespondToContactRequest, Ack),
298 (RespondToChannelInvite, Ack),
299 (JoinChannel, JoinRoomResponse),
300 (RemoveChannel, Ack),
301 (RenameProjectEntry, ProjectEntryResponse),
302 (SaveBuffer, BufferSaved),
303 (SearchProject, SearchProjectResponse),
304 (ShareProject, ShareProjectResponse),
305 (SynchronizeBuffers, SynchronizeBuffersResponse),
306 (Test, Test),
307 (UpdateBuffer, Ack),
308 (UpdateParticipantLocation, Ack),
309 (UpdateProject, Ack),
310 (UpdateWorktree, Ack),
311);
312
313entity_messages!(
314 project_id,
315 AddProjectCollaborator,
316 ApplyCodeAction,
317 ApplyCompletionAdditionalEdits,
318 BufferReloaded,
319 BufferSaved,
320 CopyProjectEntry,
321 CreateBufferForPeer,
322 CreateProjectEntry,
323 DeleteProjectEntry,
324 ExpandProjectEntry,
325 Follow,
326 FormatBuffers,
327 GetCodeActions,
328 GetCompletions,
329 GetDefinition,
330 GetTypeDefinition,
331 GetDocumentHighlights,
332 GetHover,
333 GetReferences,
334 GetProjectSymbols,
335 JoinProject,
336 LeaveProject,
337 OpenBufferById,
338 OpenBufferByPath,
339 OpenBufferForSymbol,
340 PerformRename,
341 OnTypeFormatting,
342 InlayHints,
343 RefreshInlayHints,
344 PrepareRename,
345 ReloadBuffers,
346 RemoveProjectCollaborator,
347 RenameProjectEntry,
348 SaveBuffer,
349 SearchProject,
350 StartLanguageServer,
351 SynchronizeBuffers,
352 Unfollow,
353 UnshareProject,
354 UpdateBuffer,
355 UpdateBufferFile,
356 UpdateDiagnosticSummary,
357 UpdateFollowers,
358 UpdateLanguageServer,
359 UpdateProject,
360 UpdateProjectCollaborator,
361 UpdateWorktree,
362 UpdateWorktreeSettings,
363 UpdateDiffBase
364);
365
366const KIB: usize = 1024;
367const MIB: usize = KIB * 1024;
368const MAX_BUFFER_LEN: usize = MIB;
369
370/// A stream of protobuf messages.
371pub struct MessageStream<S> {
372 stream: S,
373 encoding_buffer: Vec<u8>,
374}
375
376#[allow(clippy::large_enum_variant)]
377#[derive(Debug)]
378pub enum Message {
379 Envelope(Envelope),
380 Ping,
381 Pong,
382}
383
384impl<S> MessageStream<S> {
385 pub fn new(stream: S) -> Self {
386 Self {
387 stream,
388 encoding_buffer: Vec::new(),
389 }
390 }
391
392 pub fn inner_mut(&mut self) -> &mut S {
393 &mut self.stream
394 }
395}
396
397impl<S> MessageStream<S>
398where
399 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
400{
401 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
402 #[cfg(any(test, feature = "test-support"))]
403 const COMPRESSION_LEVEL: i32 = -7;
404
405 #[cfg(not(any(test, feature = "test-support")))]
406 const COMPRESSION_LEVEL: i32 = 4;
407
408 match message {
409 Message::Envelope(message) => {
410 self.encoding_buffer.reserve(message.encoded_len());
411 message
412 .encode(&mut self.encoding_buffer)
413 .map_err(io::Error::from)?;
414 let buffer =
415 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
416 .unwrap();
417
418 self.encoding_buffer.clear();
419 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
420 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
421 }
422 Message::Ping => {
423 self.stream
424 .send(WebSocketMessage::Ping(Default::default()))
425 .await?;
426 }
427 Message::Pong => {
428 self.stream
429 .send(WebSocketMessage::Pong(Default::default()))
430 .await?;
431 }
432 }
433
434 Ok(())
435 }
436}
437
438impl<S> MessageStream<S>
439where
440 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
441{
442 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
443 while let Some(bytes) = self.stream.next().await {
444 match bytes? {
445 WebSocketMessage::Binary(bytes) => {
446 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
447 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
448 .map_err(io::Error::from)?;
449
450 self.encoding_buffer.clear();
451 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
452 return Ok(Message::Envelope(envelope));
453 }
454 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
455 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
456 WebSocketMessage::Close(_) => break,
457 _ => {}
458 }
459 }
460 Err(anyhow!("connection closed"))
461 }
462}
463
464impl From<Timestamp> for SystemTime {
465 fn from(val: Timestamp) -> Self {
466 UNIX_EPOCH
467 .checked_add(Duration::new(val.seconds, val.nanos))
468 .unwrap()
469 }
470}
471
472impl From<SystemTime> for Timestamp {
473 fn from(time: SystemTime) -> Self {
474 let duration = time.duration_since(UNIX_EPOCH).unwrap();
475 Self {
476 seconds: duration.as_secs(),
477 nanos: duration.subsec_nanos(),
478 }
479 }
480}
481
482impl From<u128> for Nonce {
483 fn from(nonce: u128) -> Self {
484 let upper_half = (nonce >> 64) as u64;
485 let lower_half = nonce as u64;
486 Self {
487 upper_half,
488 lower_half,
489 }
490 }
491}
492
493impl From<Nonce> for u128 {
494 fn from(nonce: Nonce) -> Self {
495 let upper_half = (nonce.upper_half as u128) << 64;
496 let lower_half = nonce.lower_half as u128;
497 upper_half | lower_half
498 }
499}
500
501pub fn split_worktree_update(
502 mut message: UpdateWorktree,
503 max_chunk_size: usize,
504) -> impl Iterator<Item = UpdateWorktree> {
505 let mut done_files = false;
506
507 let mut repository_map = message
508 .updated_repositories
509 .into_iter()
510 .map(|repo| (repo.work_directory_id, repo))
511 .collect::<HashMap<_, _>>();
512
513 iter::from_fn(move || {
514 if done_files {
515 return None;
516 }
517
518 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
519 let updated_entries: Vec<_> = message
520 .updated_entries
521 .drain(..updated_entries_chunk_size)
522 .collect();
523
524 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
525 let removed_entries = message
526 .removed_entries
527 .drain(..removed_entries_chunk_size)
528 .collect();
529
530 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
531
532 let mut updated_repositories = Vec::new();
533
534 if !repository_map.is_empty() {
535 for entry in &updated_entries {
536 if let Some(repo) = repository_map.remove(&entry.id) {
537 updated_repositories.push(repo)
538 }
539 }
540 }
541
542 let removed_repositories = if done_files {
543 mem::take(&mut message.removed_repositories)
544 } else {
545 Default::default()
546 };
547
548 if done_files {
549 updated_repositories.extend(mem::take(&mut repository_map).into_values());
550 }
551
552 Some(UpdateWorktree {
553 project_id: message.project_id,
554 worktree_id: message.worktree_id,
555 root_name: message.root_name.clone(),
556 abs_path: message.abs_path.clone(),
557 updated_entries,
558 removed_entries,
559 scan_id: message.scan_id,
560 is_last_update: done_files && message.is_last_update,
561 updated_repositories,
562 removed_repositories,
563 })
564 })
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[gpui::test]
572 async fn test_buffer_size() {
573 let (tx, rx) = futures::channel::mpsc::unbounded();
574 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
575 sink.write(Message::Envelope(Envelope {
576 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
577 root_name: "abcdefg".repeat(10),
578 ..Default::default()
579 })),
580 ..Default::default()
581 }))
582 .await
583 .unwrap();
584 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
585 sink.write(Message::Envelope(Envelope {
586 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
587 root_name: "abcdefg".repeat(1000000),
588 ..Default::default()
589 })),
590 ..Default::default()
591 }))
592 .await
593 .unwrap();
594 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
595
596 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
597 stream.read().await.unwrap();
598 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
599 stream.read().await.unwrap();
600 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
601 }
602
603 #[gpui::test]
604 fn test_converting_peer_id_from_and_to_u64() {
605 let peer_id = PeerId {
606 owner_id: 10,
607 id: 3,
608 };
609 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
610 let peer_id = PeerId {
611 owner_id: u32::MAX,
612 id: 3,
613 };
614 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
615 let peer_id = PeerId {
616 owner_id: 10,
617 id: u32::MAX,
618 };
619 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
620 let peer_id = PeerId {
621 owner_id: u32::MAX,
622 id: u32::MAX,
623 };
624 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
625 }
626}