1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use async_tungstenite::tungstenite::Message as WebSocketMessage;
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message as _;
6use serde::Serialize;
7use std::any::{Any, TypeId};
8use std::{cmp, iter, mem};
9use std::{
10 fmt::Debug,
11 io,
12 time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
16
17pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
18 const NAME: &'static str;
19 const PRIORITY: MessagePriority;
20 fn into_envelope(
21 self,
22 id: u32,
23 responding_to: Option<u32>,
24 original_sender_id: Option<u32>,
25 ) -> Envelope;
26 fn from_envelope(envelope: Envelope) -> Option<Self>;
27}
28
29pub trait EntityMessage: EnvelopedMessage {
30 fn remote_entity_id(&self) -> u64;
31}
32
33pub trait RequestMessage: EnvelopedMessage {
34 type Response: EnvelopedMessage;
35}
36
37pub trait AnyTypedEnvelope: 'static + Send + Sync {
38 fn payload_type_id(&self) -> TypeId;
39 fn payload_type_name(&self) -> &'static str;
40 fn as_any(&self) -> &dyn Any;
41 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
42 fn is_background(&self) -> bool;
43 fn original_sender_id(&self) -> Option<PeerId>;
44}
45
46pub enum MessagePriority {
47 Foreground,
48 Background,
49}
50
51impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
52 fn payload_type_id(&self) -> TypeId {
53 TypeId::of::<T>()
54 }
55
56 fn payload_type_name(&self) -> &'static str {
57 T::NAME
58 }
59
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63
64 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
65 self
66 }
67
68 fn is_background(&self) -> bool {
69 matches!(T::PRIORITY, MessagePriority::Background)
70 }
71
72 fn original_sender_id(&self) -> Option<PeerId> {
73 self.original_sender_id
74 }
75}
76
77messages!(
78 (Ack, Foreground),
79 (AddProjectCollaborator, Foreground),
80 (ApplyCodeAction, Background),
81 (ApplyCodeActionResponse, Background),
82 (ApplyCompletionAdditionalEdits, Background),
83 (ApplyCompletionAdditionalEditsResponse, Background),
84 (BufferReloaded, Foreground),
85 (BufferSaved, Foreground),
86 (Call, Foreground),
87 (CancelCall, Foreground),
88 (ChannelMessageSent, Foreground),
89 (CopyProjectEntry, Foreground),
90 (CreateBufferForPeer, Foreground),
91 (CreateProjectEntry, Foreground),
92 (CreateRoom, Foreground),
93 (CreateRoomResponse, Foreground),
94 (DeclineCall, Foreground),
95 (DeleteProjectEntry, Foreground),
96 (Error, Foreground),
97 (Follow, Foreground),
98 (FollowResponse, Foreground),
99 (FormatBuffers, Foreground),
100 (FormatBuffersResponse, Foreground),
101 (FuzzySearchUsers, Foreground),
102 (GetChannelMessages, Foreground),
103 (GetChannelMessagesResponse, Foreground),
104 (GetChannels, Foreground),
105 (GetChannelsResponse, Foreground),
106 (GetCodeActions, Background),
107 (GetCodeActionsResponse, Background),
108 (GetHover, Background),
109 (GetHoverResponse, Background),
110 (GetCompletions, Background),
111 (GetCompletionsResponse, Background),
112 (GetDefinition, Background),
113 (GetDefinitionResponse, Background),
114 (GetTypeDefinition, Background),
115 (GetTypeDefinitionResponse, Background),
116 (GetDocumentHighlights, Background),
117 (GetDocumentHighlightsResponse, Background),
118 (GetReferences, Background),
119 (GetReferencesResponse, Background),
120 (GetProjectSymbols, Background),
121 (GetProjectSymbolsResponse, Background),
122 (GetUsers, Foreground),
123 (IncomingCall, Foreground),
124 (UsersResponse, Foreground),
125 (JoinChannel, Foreground),
126 (JoinChannelResponse, Foreground),
127 (JoinProject, Foreground),
128 (JoinProjectResponse, Foreground),
129 (JoinRoom, Foreground),
130 (JoinRoomResponse, Foreground),
131 (LeaveChannel, Foreground),
132 (LeaveProject, Foreground),
133 (LeaveRoom, Foreground),
134 (OpenBufferById, Background),
135 (OpenBufferByPath, Background),
136 (OpenBufferForSymbol, Background),
137 (OpenBufferForSymbolResponse, Background),
138 (OpenBufferResponse, Background),
139 (PerformRename, Background),
140 (PerformRenameResponse, Background),
141 (PrepareRename, Background),
142 (PrepareRenameResponse, Background),
143 (ProjectEntryResponse, Foreground),
144 (RemoveContact, Foreground),
145 (Ping, Foreground),
146 (RegisterProjectActivity, Foreground),
147 (ReloadBuffers, Foreground),
148 (ReloadBuffersResponse, Foreground),
149 (RemoveProjectCollaborator, Foreground),
150 (RenameProjectEntry, Foreground),
151 (RequestContact, Foreground),
152 (RespondToContactRequest, Foreground),
153 (RoomUpdated, Foreground),
154 (SaveBuffer, Foreground),
155 (SearchProject, Background),
156 (SearchProjectResponse, Background),
157 (SendChannelMessage, Foreground),
158 (SendChannelMessageResponse, Foreground),
159 (ShareProject, Foreground),
160 (ShareProjectResponse, Foreground),
161 (ShowContacts, Foreground),
162 (StartLanguageServer, Foreground),
163 (Test, Foreground),
164 (Unfollow, Foreground),
165 (UnshareProject, Foreground),
166 (UpdateBuffer, Foreground),
167 (UpdateBufferFile, Foreground),
168 (UpdateContacts, Foreground),
169 (UpdateDiagnosticSummary, Foreground),
170 (UpdateFollowers, Foreground),
171 (UpdateInviteInfo, Foreground),
172 (UpdateLanguageServer, Foreground),
173 (UpdateParticipantLocation, Foreground),
174 (UpdateProject, Foreground),
175 (UpdateWorktree, Foreground),
176 (UpdateWorktreeExtensions, Background),
177);
178
179request_messages!(
180 (ApplyCodeAction, ApplyCodeActionResponse),
181 (
182 ApplyCompletionAdditionalEdits,
183 ApplyCompletionAdditionalEditsResponse
184 ),
185 (Call, Ack),
186 (CopyProjectEntry, ProjectEntryResponse),
187 (CreateProjectEntry, ProjectEntryResponse),
188 (CreateRoom, CreateRoomResponse),
189 (DeclineCall, Ack),
190 (DeleteProjectEntry, ProjectEntryResponse),
191 (Follow, FollowResponse),
192 (FormatBuffers, FormatBuffersResponse),
193 (GetChannelMessages, GetChannelMessagesResponse),
194 (GetChannels, GetChannelsResponse),
195 (GetCodeActions, GetCodeActionsResponse),
196 (GetHover, GetHoverResponse),
197 (GetCompletions, GetCompletionsResponse),
198 (GetDefinition, GetDefinitionResponse),
199 (GetTypeDefinition, GetTypeDefinitionResponse),
200 (GetDocumentHighlights, GetDocumentHighlightsResponse),
201 (GetReferences, GetReferencesResponse),
202 (GetProjectSymbols, GetProjectSymbolsResponse),
203 (FuzzySearchUsers, UsersResponse),
204 (GetUsers, UsersResponse),
205 (JoinChannel, JoinChannelResponse),
206 (JoinProject, JoinProjectResponse),
207 (JoinRoom, JoinRoomResponse),
208 (IncomingCall, Ack),
209 (OpenBufferById, OpenBufferResponse),
210 (OpenBufferByPath, OpenBufferResponse),
211 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
212 (Ping, Ack),
213 (PerformRename, PerformRenameResponse),
214 (PrepareRename, PrepareRenameResponse),
215 (ReloadBuffers, ReloadBuffersResponse),
216 (RequestContact, Ack),
217 (RemoveContact, Ack),
218 (RespondToContactRequest, Ack),
219 (RenameProjectEntry, ProjectEntryResponse),
220 (SaveBuffer, BufferSaved),
221 (SearchProject, SearchProjectResponse),
222 (SendChannelMessage, SendChannelMessageResponse),
223 (ShareProject, ShareProjectResponse),
224 (Test, Test),
225 (UpdateBuffer, Ack),
226 (UpdateParticipantLocation, Ack),
227 (UpdateWorktree, Ack),
228);
229
230entity_messages!(
231 project_id,
232 AddProjectCollaborator,
233 ApplyCodeAction,
234 ApplyCompletionAdditionalEdits,
235 BufferReloaded,
236 BufferSaved,
237 CopyProjectEntry,
238 CreateBufferForPeer,
239 CreateProjectEntry,
240 DeleteProjectEntry,
241 Follow,
242 FormatBuffers,
243 GetCodeActions,
244 GetCompletions,
245 GetDefinition,
246 GetTypeDefinition,
247 GetDocumentHighlights,
248 GetHover,
249 GetReferences,
250 GetProjectSymbols,
251 JoinProject,
252 LeaveProject,
253 OpenBufferById,
254 OpenBufferByPath,
255 OpenBufferForSymbol,
256 PerformRename,
257 PrepareRename,
258 RegisterProjectActivity,
259 ReloadBuffers,
260 RemoveProjectCollaborator,
261 RenameProjectEntry,
262 SaveBuffer,
263 SearchProject,
264 StartLanguageServer,
265 Unfollow,
266 UnshareProject,
267 UpdateBuffer,
268 UpdateBufferFile,
269 UpdateDiagnosticSummary,
270 UpdateFollowers,
271 UpdateLanguageServer,
272 UpdateProject,
273 UpdateWorktree,
274 UpdateWorktreeExtensions,
275);
276
277entity_messages!(channel_id, ChannelMessageSent);
278
279const KIB: usize = 1024;
280const MIB: usize = KIB * 1024;
281const MAX_BUFFER_LEN: usize = MIB;
282
283/// A stream of protobuf messages.
284pub struct MessageStream<S> {
285 stream: S,
286 encoding_buffer: Vec<u8>,
287}
288
289#[allow(clippy::large_enum_variant)]
290#[derive(Debug)]
291pub enum Message {
292 Envelope(Envelope),
293 Ping,
294 Pong,
295}
296
297impl<S> MessageStream<S> {
298 pub fn new(stream: S) -> Self {
299 Self {
300 stream,
301 encoding_buffer: Vec::new(),
302 }
303 }
304
305 pub fn inner_mut(&mut self) -> &mut S {
306 &mut self.stream
307 }
308}
309
310impl<S> MessageStream<S>
311where
312 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
313{
314 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
315 #[cfg(any(test, feature = "test-support"))]
316 const COMPRESSION_LEVEL: i32 = -7;
317
318 #[cfg(not(any(test, feature = "test-support")))]
319 const COMPRESSION_LEVEL: i32 = 4;
320
321 match message {
322 Message::Envelope(message) => {
323 self.encoding_buffer.reserve(message.encoded_len());
324 message
325 .encode(&mut self.encoding_buffer)
326 .map_err(io::Error::from)?;
327 let buffer =
328 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
329 .unwrap();
330
331 self.encoding_buffer.clear();
332 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
333 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
334 }
335 Message::Ping => {
336 self.stream
337 .send(WebSocketMessage::Ping(Default::default()))
338 .await?;
339 }
340 Message::Pong => {
341 self.stream
342 .send(WebSocketMessage::Pong(Default::default()))
343 .await?;
344 }
345 }
346
347 Ok(())
348 }
349}
350
351impl<S> MessageStream<S>
352where
353 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
354{
355 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
356 while let Some(bytes) = self.stream.next().await {
357 match bytes? {
358 WebSocketMessage::Binary(bytes) => {
359 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
360 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
361 .map_err(io::Error::from)?;
362
363 self.encoding_buffer.clear();
364 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
365 return Ok(Message::Envelope(envelope));
366 }
367 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
368 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
369 WebSocketMessage::Close(_) => break,
370 _ => {}
371 }
372 }
373 Err(anyhow!("connection closed"))
374 }
375}
376
377impl From<Timestamp> for SystemTime {
378 fn from(val: Timestamp) -> Self {
379 UNIX_EPOCH
380 .checked_add(Duration::new(val.seconds, val.nanos))
381 .unwrap()
382 }
383}
384
385impl From<SystemTime> for Timestamp {
386 fn from(time: SystemTime) -> Self {
387 let duration = time.duration_since(UNIX_EPOCH).unwrap();
388 Self {
389 seconds: duration.as_secs(),
390 nanos: duration.subsec_nanos(),
391 }
392 }
393}
394
395impl From<u128> for Nonce {
396 fn from(nonce: u128) -> Self {
397 let upper_half = (nonce >> 64) as u64;
398 let lower_half = nonce as u64;
399 Self {
400 upper_half,
401 lower_half,
402 }
403 }
404}
405
406impl From<Nonce> for u128 {
407 fn from(nonce: Nonce) -> Self {
408 let upper_half = (nonce.upper_half as u128) << 64;
409 let lower_half = nonce.lower_half as u128;
410 upper_half | lower_half
411 }
412}
413
414pub fn split_worktree_update(
415 mut message: UpdateWorktree,
416 max_chunk_size: usize,
417) -> impl Iterator<Item = UpdateWorktree> {
418 let mut done = false;
419 iter::from_fn(move || {
420 if done {
421 return None;
422 }
423
424 let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
425 let updated_entries = message.updated_entries.drain(..chunk_size).collect();
426 done = message.updated_entries.is_empty();
427 Some(UpdateWorktree {
428 project_id: message.project_id,
429 worktree_id: message.worktree_id,
430 root_name: message.root_name.clone(),
431 updated_entries,
432 removed_entries: mem::take(&mut message.removed_entries),
433 scan_id: message.scan_id,
434 is_last_update: done && message.is_last_update,
435 })
436 })
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[gpui::test]
444 async fn test_buffer_size() {
445 let (tx, rx) = futures::channel::mpsc::unbounded();
446 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
447 sink.write(Message::Envelope(Envelope {
448 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
449 root_name: "abcdefg".repeat(10),
450 ..Default::default()
451 })),
452 ..Default::default()
453 }))
454 .await
455 .unwrap();
456 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
457 sink.write(Message::Envelope(Envelope {
458 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
459 root_name: "abcdefg".repeat(1000000),
460 ..Default::default()
461 })),
462 ..Default::default()
463 }))
464 .await
465 .unwrap();
466 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
467
468 let mut stream = MessageStream::new(rx.map(anyhow::Ok));
469 stream.read().await.unwrap();
470 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
471 stream.read().await.unwrap();
472 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
473 }
474}