1#![allow(non_snake_case)]
2
3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
4use anyhow::{anyhow, Result};
5use async_tungstenite::tungstenite::Message as WebSocketMessage;
6use collections::HashMap;
7use futures::{SinkExt as _, StreamExt as _};
8use prost::Message as _;
9use serde::Serialize;
10use std::any::{Any, TypeId};
11use std::{
12 cmp,
13 fmt::Debug,
14 io, iter,
15 time::{Duration, SystemTime, UNIX_EPOCH},
16};
17use std::{fmt, mem};
18
19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
20
21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
22 const NAME: &'static str;
23 const PRIORITY: MessagePriority;
24 fn into_envelope(
25 self,
26 id: u32,
27 responding_to: Option<u32>,
28 original_sender_id: Option<PeerId>,
29 ) -> Envelope;
30 fn from_envelope(envelope: Envelope) -> Option<Self>;
31}
32
33pub trait EntityMessage: EnvelopedMessage {
34 fn remote_entity_id(&self) -> u64;
35}
36
37pub trait RequestMessage: EnvelopedMessage {
38 type Response: EnvelopedMessage;
39}
40
41pub trait AnyTypedEnvelope: 'static + Send + Sync {
42 fn payload_type_id(&self) -> TypeId;
43 fn payload_type_name(&self) -> &'static str;
44 fn as_any(&self) -> &dyn Any;
45 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
46 fn is_background(&self) -> bool;
47 fn original_sender_id(&self) -> Option<PeerId>;
48 fn sender_id(&self) -> ConnectionId;
49 fn message_id(&self) -> u32;
50}
51
52pub enum MessagePriority {
53 Foreground,
54 Background,
55}
56
57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
58 fn payload_type_id(&self) -> TypeId {
59 TypeId::of::<T>()
60 }
61
62 fn payload_type_name(&self) -> &'static str {
63 T::NAME
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
71 self
72 }
73
74 fn is_background(&self) -> bool {
75 matches!(T::PRIORITY, MessagePriority::Background)
76 }
77
78 fn original_sender_id(&self) -> Option<PeerId> {
79 self.original_sender_id
80 }
81
82 fn sender_id(&self) -> ConnectionId {
83 self.sender_id
84 }
85
86 fn message_id(&self) -> u32 {
87 self.message_id
88 }
89}
90
91impl PeerId {
92 pub fn from_u64(peer_id: u64) -> Self {
93 let owner_id = (peer_id >> 32) as u32;
94 let id = peer_id as u32;
95 Self { owner_id, id }
96 }
97
98 pub fn as_u64(self) -> u64 {
99 ((self.owner_id as u64) << 32) | (self.id as u64)
100 }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108 fn cmp(&self, other: &Self) -> cmp::Ordering {
109 self.owner_id
110 .cmp(&other.owner_id)
111 .then_with(|| self.id.cmp(&other.id))
112 }
113}
114
115impl PartialOrd for PeerId {
116 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117 Some(self.cmp(other))
118 }
119}
120
121impl std::hash::Hash for PeerId {
122 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123 self.owner_id.hash(state);
124 self.id.hash(state);
125 }
126}
127
128impl fmt::Display for PeerId {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 write!(f, "{}/{}", self.owner_id, self.id)
131 }
132}
133
134messages!(
135 (Ack, Foreground),
136 (AddProjectCollaborator, Foreground),
137 (ApplyCodeAction, Background),
138 (ApplyCodeActionResponse, Background),
139 (ApplyCompletionAdditionalEdits, Background),
140 (ApplyCompletionAdditionalEditsResponse, Background),
141 (BufferReloaded, Foreground),
142 (BufferSaved, Foreground),
143 (Call, Foreground),
144 (CallCanceled, Foreground),
145 (CancelCall, Foreground),
146 (CopyProjectEntry, Foreground),
147 (CreateBufferForPeer, Foreground),
148 (CreateChannel, Foreground),
149 (ChannelResponse, Foreground),
150 (CreateProjectEntry, Foreground),
151 (CreateRoom, Foreground),
152 (CreateRoomResponse, Foreground),
153 (DeclineCall, Foreground),
154 (DeleteProjectEntry, Foreground),
155 (Error, Foreground),
156 (ExpandProjectEntry, Foreground),
157 (Follow, Foreground),
158 (FollowResponse, Foreground),
159 (FormatBuffers, Foreground),
160 (FormatBuffersResponse, Foreground),
161 (FuzzySearchUsers, Foreground),
162 (GetCodeActions, Background),
163 (GetCodeActionsResponse, Background),
164 (GetHover, Background),
165 (GetHoverResponse, Background),
166 (GetCompletions, Background),
167 (GetCompletionsResponse, Background),
168 (GetDefinition, Background),
169 (GetDefinitionResponse, Background),
170 (GetTypeDefinition, Background),
171 (GetTypeDefinitionResponse, Background),
172 (GetDocumentHighlights, Background),
173 (GetDocumentHighlightsResponse, Background),
174 (GetReferences, Background),
175 (GetReferencesResponse, Background),
176 (GetProjectSymbols, Background),
177 (GetProjectSymbolsResponse, Background),
178 (GetUsers, Foreground),
179 (Hello, Foreground),
180 (IncomingCall, Foreground),
181 (InviteChannelMember, Foreground),
182 (UsersResponse, Foreground),
183 (JoinProject, Foreground),
184 (JoinProjectResponse, Foreground),
185 (JoinRoom, Foreground),
186 (JoinRoomResponse, Foreground),
187 (LeaveProject, Foreground),
188 (LeaveRoom, Foreground),
189 (OpenBufferById, Background),
190 (OpenBufferByPath, Background),
191 (OpenBufferForSymbol, Background),
192 (OpenBufferForSymbolResponse, Background),
193 (OpenBufferResponse, Background),
194 (PerformRename, Background),
195 (PerformRenameResponse, Background),
196 (OnTypeFormatting, Background),
197 (OnTypeFormattingResponse, Background),
198 (InlayHints, Background),
199 (InlayHintsResponse, Background),
200 (ResolveInlayHint, Background),
201 (ResolveInlayHintResponse, Background),
202 (RefreshInlayHints, Foreground),
203 (Ping, Foreground),
204 (PrepareRename, Background),
205 (PrepareRenameResponse, Background),
206 (ExpandProjectEntryResponse, Foreground),
207 (ProjectEntryResponse, Foreground),
208 (RejoinRoom, Foreground),
209 (RejoinRoomResponse, Foreground),
210 (RemoveContact, Foreground),
211 (RemoveChannelMember, Foreground),
212 (ReloadBuffers, Foreground),
213 (ReloadBuffersResponse, Foreground),
214 (RemoveProjectCollaborator, Foreground),
215 (RenameProjectEntry, Foreground),
216 (RequestContact, Foreground),
217 (RespondToContactRequest, Foreground),
218 (RespondToChannelInvite, Foreground),
219 (JoinChannel, Foreground),
220 (RoomUpdated, Foreground),
221 (SaveBuffer, Foreground),
222 (RenameChannel, Foreground),
223 (SetChannelMemberAdmin, Foreground),
224 (SearchProject, Background),
225 (SearchProjectResponse, Background),
226 (ShareProject, Foreground),
227 (ShareProjectResponse, Foreground),
228 (ShowContacts, Foreground),
229 (StartLanguageServer, Foreground),
230 (SynchronizeBuffers, Foreground),
231 (SynchronizeBuffersResponse, Foreground),
232 (RejoinChannelBuffers, Foreground),
233 (RejoinChannelBuffersResponse, Foreground),
234 (Test, Foreground),
235 (Unfollow, Foreground),
236 (UnshareProject, Foreground),
237 (UpdateBuffer, Foreground),
238 (UpdateBufferFile, Foreground),
239 (UpdateContacts, Foreground),
240 (RemoveChannel, Foreground),
241 (UpdateChannels, Foreground),
242 (UpdateDiagnosticSummary, Foreground),
243 (UpdateFollowers, Foreground),
244 (UpdateInviteInfo, Foreground),
245 (UpdateLanguageServer, Foreground),
246 (UpdateParticipantLocation, Foreground),
247 (UpdateProject, Foreground),
248 (UpdateProjectCollaborator, Foreground),
249 (UpdateWorktree, Foreground),
250 (UpdateWorktreeSettings, Foreground),
251 (UpdateDiffBase, Foreground),
252 (GetPrivateUserInfo, Foreground),
253 (GetPrivateUserInfoResponse, Foreground),
254 (GetChannelMembers, Foreground),
255 (GetChannelMembersResponse, Foreground),
256 (JoinChannelBuffer, Foreground),
257 (JoinChannelBufferResponse, Foreground),
258 (LeaveChannelBuffer, Background),
259 (UpdateChannelBuffer, Foreground),
260 (RemoveChannelBufferCollaborator, Foreground),
261 (AddChannelBufferCollaborator, Foreground),
262 (UpdateChannelBufferCollaborator, Foreground),
263);
264
265request_messages!(
266 (ApplyCodeAction, ApplyCodeActionResponse),
267 (
268 ApplyCompletionAdditionalEdits,
269 ApplyCompletionAdditionalEditsResponse
270 ),
271 (Call, Ack),
272 (CancelCall, Ack),
273 (CopyProjectEntry, ProjectEntryResponse),
274 (CreateProjectEntry, ProjectEntryResponse),
275 (CreateRoom, CreateRoomResponse),
276 (CreateChannel, ChannelResponse),
277 (DeclineCall, Ack),
278 (DeleteProjectEntry, ProjectEntryResponse),
279 (ExpandProjectEntry, ExpandProjectEntryResponse),
280 (Follow, FollowResponse),
281 (FormatBuffers, FormatBuffersResponse),
282 (GetCodeActions, GetCodeActionsResponse),
283 (GetHover, GetHoverResponse),
284 (GetCompletions, GetCompletionsResponse),
285 (GetDefinition, GetDefinitionResponse),
286 (GetTypeDefinition, GetTypeDefinitionResponse),
287 (GetDocumentHighlights, GetDocumentHighlightsResponse),
288 (GetReferences, GetReferencesResponse),
289 (GetPrivateUserInfo, GetPrivateUserInfoResponse),
290 (GetProjectSymbols, GetProjectSymbolsResponse),
291 (FuzzySearchUsers, UsersResponse),
292 (GetUsers, UsersResponse),
293 (InviteChannelMember, Ack),
294 (JoinProject, JoinProjectResponse),
295 (JoinRoom, JoinRoomResponse),
296 (LeaveRoom, Ack),
297 (RejoinRoom, RejoinRoomResponse),
298 (IncomingCall, Ack),
299 (OpenBufferById, OpenBufferResponse),
300 (OpenBufferByPath, OpenBufferResponse),
301 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
302 (Ping, Ack),
303 (PerformRename, PerformRenameResponse),
304 (PrepareRename, PrepareRenameResponse),
305 (OnTypeFormatting, OnTypeFormattingResponse),
306 (InlayHints, InlayHintsResponse),
307 (ResolveInlayHint, ResolveInlayHintResponse),
308 (RefreshInlayHints, Ack),
309 (ReloadBuffers, ReloadBuffersResponse),
310 (RequestContact, Ack),
311 (RemoveChannelMember, Ack),
312 (RemoveContact, Ack),
313 (RespondToContactRequest, Ack),
314 (RespondToChannelInvite, Ack),
315 (SetChannelMemberAdmin, Ack),
316 (GetChannelMembers, GetChannelMembersResponse),
317 (JoinChannel, JoinRoomResponse),
318 (RemoveChannel, Ack),
319 (RenameProjectEntry, ProjectEntryResponse),
320 (RenameChannel, ChannelResponse),
321 (SaveBuffer, BufferSaved),
322 (SearchProject, SearchProjectResponse),
323 (ShareProject, ShareProjectResponse),
324 (SynchronizeBuffers, SynchronizeBuffersResponse),
325 (RejoinChannelBuffers, RejoinChannelBuffersResponse),
326 (Test, Test),
327 (UpdateBuffer, Ack),
328 (UpdateParticipantLocation, Ack),
329 (UpdateProject, Ack),
330 (UpdateWorktree, Ack),
331 (JoinChannelBuffer, JoinChannelBufferResponse),
332 (LeaveChannelBuffer, Ack)
333);
334
335entity_messages!(
336 project_id,
337 AddProjectCollaborator,
338 ApplyCodeAction,
339 ApplyCompletionAdditionalEdits,
340 BufferReloaded,
341 BufferSaved,
342 CopyProjectEntry,
343 CreateBufferForPeer,
344 CreateProjectEntry,
345 DeleteProjectEntry,
346 ExpandProjectEntry,
347 Follow,
348 FormatBuffers,
349 GetCodeActions,
350 GetCompletions,
351 GetDefinition,
352 GetTypeDefinition,
353 GetDocumentHighlights,
354 GetHover,
355 GetReferences,
356 GetProjectSymbols,
357 JoinProject,
358 LeaveProject,
359 OpenBufferById,
360 OpenBufferByPath,
361 OpenBufferForSymbol,
362 PerformRename,
363 OnTypeFormatting,
364 InlayHints,
365 ResolveInlayHint,
366 RefreshInlayHints,
367 PrepareRename,
368 ReloadBuffers,
369 RemoveProjectCollaborator,
370 RenameProjectEntry,
371 SaveBuffer,
372 SearchProject,
373 StartLanguageServer,
374 SynchronizeBuffers,
375 Unfollow,
376 UnshareProject,
377 UpdateBuffer,
378 UpdateBufferFile,
379 UpdateDiagnosticSummary,
380 UpdateFollowers,
381 UpdateLanguageServer,
382 UpdateProject,
383 UpdateProjectCollaborator,
384 UpdateWorktree,
385 UpdateWorktreeSettings,
386 UpdateDiffBase
387);
388
389entity_messages!(
390 channel_id,
391 UpdateChannelBuffer,
392 RemoveChannelBufferCollaborator,
393 AddChannelBufferCollaborator,
394 UpdateChannelBufferCollaborator
395);
396
397const KIB: usize = 1024;
398const MIB: usize = KIB * 1024;
399const MAX_BUFFER_LEN: usize = MIB;
400
401/// A stream of protobuf messages.
402pub struct MessageStream<S> {
403 stream: S,
404 encoding_buffer: Vec<u8>,
405}
406
407#[allow(clippy::large_enum_variant)]
408#[derive(Debug)]
409pub enum Message {
410 Envelope(Envelope),
411 Ping,
412 Pong,
413}
414
415impl<S> MessageStream<S> {
416 pub fn new(stream: S) -> Self {
417 Self {
418 stream,
419 encoding_buffer: Vec::new(),
420 }
421 }
422
423 pub fn inner_mut(&mut self) -> &mut S {
424 &mut self.stream
425 }
426}
427
428impl<S> MessageStream<S>
429where
430 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
431{
432 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
433 #[cfg(any(test, feature = "test-support"))]
434 const COMPRESSION_LEVEL: i32 = -7;
435
436 #[cfg(not(any(test, feature = "test-support")))]
437 const COMPRESSION_LEVEL: i32 = 4;
438
439 match message {
440 Message::Envelope(message) => {
441 self.encoding_buffer.reserve(message.encoded_len());
442 message
443 .encode(&mut self.encoding_buffer)
444 .map_err(io::Error::from)?;
445 let buffer =
446 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
447 .unwrap();
448
449 self.encoding_buffer.clear();
450 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
451 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
452 }
453 Message::Ping => {
454 self.stream
455 .send(WebSocketMessage::Ping(Default::default()))
456 .await?;
457 }
458 Message::Pong => {
459 self.stream
460 .send(WebSocketMessage::Pong(Default::default()))
461 .await?;
462 }
463 }
464
465 Ok(())
466 }
467}
468
469impl<S> MessageStream<S>
470where
471 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
472{
473 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
474 while let Some(bytes) = self.stream.next().await {
475 match bytes? {
476 WebSocketMessage::Binary(bytes) => {
477 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
478 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
479 .map_err(io::Error::from)?;
480
481 self.encoding_buffer.clear();
482 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
483 return Ok(Message::Envelope(envelope));
484 }
485 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
486 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
487 WebSocketMessage::Close(_) => break,
488 _ => {}
489 }
490 }
491 Err(anyhow!("connection closed"))
492 }
493}
494
495impl From<Timestamp> for SystemTime {
496 fn from(val: Timestamp) -> Self {
497 UNIX_EPOCH
498 .checked_add(Duration::new(val.seconds, val.nanos))
499 .unwrap()
500 }
501}
502
503impl From<SystemTime> for Timestamp {
504 fn from(time: SystemTime) -> Self {
505 let duration = time.duration_since(UNIX_EPOCH).unwrap();
506 Self {
507 seconds: duration.as_secs(),
508 nanos: duration.subsec_nanos(),
509 }
510 }
511}
512
513impl From<u128> for Nonce {
514 fn from(nonce: u128) -> Self {
515 let upper_half = (nonce >> 64) as u64;
516 let lower_half = nonce as u64;
517 Self {
518 upper_half,
519 lower_half,
520 }
521 }
522}
523
524impl From<Nonce> for u128 {
525 fn from(nonce: Nonce) -> Self {
526 let upper_half = (nonce.upper_half as u128) << 64;
527 let lower_half = nonce.lower_half as u128;
528 upper_half | lower_half
529 }
530}
531
532pub fn split_worktree_update(
533 mut message: UpdateWorktree,
534 max_chunk_size: usize,
535) -> impl Iterator<Item = UpdateWorktree> {
536 let mut done_files = false;
537
538 let mut repository_map = message
539 .updated_repositories
540 .into_iter()
541 .map(|repo| (repo.work_directory_id, repo))
542 .collect::<HashMap<_, _>>();
543
544 iter::from_fn(move || {
545 if done_files {
546 return None;
547 }
548
549 let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
550 let updated_entries: Vec<_> = message
551 .updated_entries
552 .drain(..updated_entries_chunk_size)
553 .collect();
554
555 let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
556 let removed_entries = message
557 .removed_entries
558 .drain(..removed_entries_chunk_size)
559 .collect();
560
561 done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
562
563 let mut updated_repositories = Vec::new();
564
565 if !repository_map.is_empty() {
566 for entry in &updated_entries {
567 if let Some(repo) = repository_map.remove(&entry.id) {
568 updated_repositories.push(repo)
569 }
570 }
571 }
572
573 let removed_repositories = if done_files {
574 mem::take(&mut message.removed_repositories)
575 } else {
576 Default::default()
577 };
578
579 if done_files {
580 updated_repositories.extend(mem::take(&mut repository_map).into_values());
581 }
582
583 Some(UpdateWorktree {
584 project_id: message.project_id,
585 worktree_id: message.worktree_id,
586 root_name: message.root_name.clone(),
587 abs_path: message.abs_path.clone(),
588 updated_entries,
589 removed_entries,
590 scan_id: message.scan_id,
591 is_last_update: done_files && message.is_last_update,
592 updated_repositories,
593 removed_repositories,
594 })
595 })
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601
602 #[gpui::test]
603 async fn test_buffer_size() {
604 let (tx, rx) = futures::channel::mpsc::unbounded();
605 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
606 sink.write(Message::Envelope(Envelope {
607 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
608 root_name: "abcdefg".repeat(10),
609 ..Default::default()
610 })),
611 ..Default::default()
612 }))
613 .await
614 .unwrap();
615 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
616 sink.write(Message::Envelope(Envelope {
617 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
618 root_name: "abcdefg".repeat(1000000),
619 ..Default::default()
620 })),
621 ..Default::default()
622 }))
623 .await
624 .unwrap();
625 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
626
627 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
628 stream.read().await.unwrap();
629 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
630 stream.read().await.unwrap();
631 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
632 }
633
634 #[gpui::test]
635 fn test_converting_peer_id_from_and_to_u64() {
636 let peer_id = PeerId {
637 owner_id: 10,
638 id: 3,
639 };
640 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
641 let peer_id = PeerId {
642 owner_id: u32::MAX,
643 id: 3,
644 };
645 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
646 let peer_id = PeerId {
647 owner_id: 10,
648 id: u32::MAX,
649 };
650 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
651 let peer_id = PeerId {
652 owner_id: u32::MAX,
653 id: u32::MAX,
654 };
655 assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
656 }
657}