1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use async_tungstenite::tungstenite::Message as WebSocketMessage;
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message as _;
6use serde::Serialize;
7use std::any::{Any, TypeId};
8use std::{cmp, iter, mem};
9use std::{
10 fmt::Debug,
11 io,
12 time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
16
17pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
18 const NAME: &'static str;
19 const PRIORITY: MessagePriority;
20 fn into_envelope(
21 self,
22 id: u32,
23 responding_to: Option<u32>,
24 original_sender_id: Option<u32>,
25 ) -> Envelope;
26 fn from_envelope(envelope: Envelope) -> Option<Self>;
27}
28
29pub trait EntityMessage: EnvelopedMessage {
30 fn remote_entity_id(&self) -> u64;
31}
32
33pub trait RequestMessage: EnvelopedMessage {
34 type Response: EnvelopedMessage;
35}
36
37pub trait AnyTypedEnvelope: 'static + Send + Sync {
38 fn payload_type_id(&self) -> TypeId;
39 fn payload_type_name(&self) -> &'static str;
40 fn as_any(&self) -> &dyn Any;
41 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
42 fn is_background(&self) -> bool;
43 fn original_sender_id(&self) -> Option<PeerId>;
44}
45
46pub enum MessagePriority {
47 Foreground,
48 Background,
49}
50
51impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
52 fn payload_type_id(&self) -> TypeId {
53 TypeId::of::<T>()
54 }
55
56 fn payload_type_name(&self) -> &'static str {
57 T::NAME
58 }
59
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63
64 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
65 self
66 }
67
68 fn is_background(&self) -> bool {
69 matches!(T::PRIORITY, MessagePriority::Background)
70 }
71
72 fn original_sender_id(&self) -> Option<PeerId> {
73 self.original_sender_id
74 }
75}
76
77messages!(
78 (Ack, Foreground),
79 (AddProjectCollaborator, Foreground),
80 (ApplyCodeAction, Background),
81 (ApplyCodeActionResponse, Background),
82 (ApplyCompletionAdditionalEdits, Background),
83 (ApplyCompletionAdditionalEditsResponse, Background),
84 (BufferReloaded, Foreground),
85 (BufferSaved, Foreground),
86 (RemoveContact, Foreground),
87 (ChannelMessageSent, Foreground),
88 (CopyProjectEntry, Foreground),
89 (CreateProjectEntry, Foreground),
90 (DeleteProjectEntry, Foreground),
91 (Error, Foreground),
92 (Follow, Foreground),
93 (FollowResponse, Foreground),
94 (FormatBuffers, Foreground),
95 (FormatBuffersResponse, Foreground),
96 (FuzzySearchUsers, Foreground),
97 (GetChannelMessages, Foreground),
98 (GetChannelMessagesResponse, Foreground),
99 (GetChannels, Foreground),
100 (GetChannelsResponse, Foreground),
101 (GetCodeActions, Background),
102 (GetCodeActionsResponse, Background),
103 (GetHover, Background),
104 (GetHoverResponse, Background),
105 (GetCompletions, Background),
106 (GetCompletionsResponse, Background),
107 (GetDefinition, Background),
108 (GetDefinitionResponse, Background),
109 (GetDocumentHighlights, Background),
110 (GetDocumentHighlightsResponse, Background),
111 (GetReferences, Background),
112 (GetReferencesResponse, Background),
113 (GetProjectSymbols, Background),
114 (GetProjectSymbolsResponse, Background),
115 (GetUsers, Foreground),
116 (UsersResponse, Foreground),
117 (JoinChannel, Foreground),
118 (JoinChannelResponse, Foreground),
119 (JoinProject, Foreground),
120 (JoinProjectResponse, Foreground),
121 (JoinProjectRequestCancelled, Foreground),
122 (LeaveChannel, Foreground),
123 (LeaveProject, Foreground),
124 (OpenBufferById, Background),
125 (OpenBufferByPath, Background),
126 (OpenBufferForSymbol, Background),
127 (OpenBufferForSymbolResponse, Background),
128 (OpenBufferResponse, Background),
129 (PerformRename, Background),
130 (PerformRenameResponse, Background),
131 (PrepareRename, Background),
132 (PrepareRenameResponse, Background),
133 (ProjectEntryResponse, Foreground),
134 (ProjectUnshared, Foreground),
135 (RegisterProjectResponse, Foreground),
136 (Ping, Foreground),
137 (RegisterProject, Foreground),
138 (RegisterProjectActivity, Foreground),
139 (ReloadBuffers, Foreground),
140 (ReloadBuffersResponse, Foreground),
141 (RemoveProjectCollaborator, Foreground),
142 (RenameProjectEntry, Foreground),
143 (RequestContact, Foreground),
144 (RequestJoinProject, Foreground),
145 (RespondToContactRequest, Foreground),
146 (RespondToJoinProjectRequest, Foreground),
147 (SaveBuffer, Foreground),
148 (SearchProject, Background),
149 (SearchProjectResponse, Background),
150 (SendChannelMessage, Foreground),
151 (SendChannelMessageResponse, Foreground),
152 (ShowContacts, Foreground),
153 (StartLanguageServer, Foreground),
154 (Test, Foreground),
155 (Unfollow, Foreground),
156 (UnregisterProject, Foreground),
157 (UpdateBuffer, Foreground),
158 (UpdateBufferFile, Foreground),
159 (UpdateContacts, Foreground),
160 (UpdateDiagnosticSummary, Foreground),
161 (UpdateFollowers, Foreground),
162 (UpdateInviteInfo, Foreground),
163 (UpdateLanguageServer, Foreground),
164 (UpdateProject, Foreground),
165 (UpdateWorktree, Foreground),
166 (UpdateWorktreeExtensions, Background),
167);
168
169request_messages!(
170 (ApplyCodeAction, ApplyCodeActionResponse),
171 (
172 ApplyCompletionAdditionalEdits,
173 ApplyCompletionAdditionalEditsResponse
174 ),
175 (CopyProjectEntry, ProjectEntryResponse),
176 (CreateProjectEntry, ProjectEntryResponse),
177 (DeleteProjectEntry, ProjectEntryResponse),
178 (Follow, FollowResponse),
179 (FormatBuffers, FormatBuffersResponse),
180 (GetChannelMessages, GetChannelMessagesResponse),
181 (GetChannels, GetChannelsResponse),
182 (GetCodeActions, GetCodeActionsResponse),
183 (GetHover, GetHoverResponse),
184 (GetCompletions, GetCompletionsResponse),
185 (GetDefinition, GetDefinitionResponse),
186 (GetDocumentHighlights, GetDocumentHighlightsResponse),
187 (GetReferences, GetReferencesResponse),
188 (GetProjectSymbols, GetProjectSymbolsResponse),
189 (FuzzySearchUsers, UsersResponse),
190 (GetUsers, UsersResponse),
191 (JoinChannel, JoinChannelResponse),
192 (JoinProject, JoinProjectResponse),
193 (OpenBufferById, OpenBufferResponse),
194 (OpenBufferByPath, OpenBufferResponse),
195 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
196 (Ping, Ack),
197 (PerformRename, PerformRenameResponse),
198 (PrepareRename, PrepareRenameResponse),
199 (RegisterProject, RegisterProjectResponse),
200 (ReloadBuffers, ReloadBuffersResponse),
201 (RequestContact, Ack),
202 (RemoveContact, Ack),
203 (RespondToContactRequest, Ack),
204 (RenameProjectEntry, ProjectEntryResponse),
205 (SaveBuffer, BufferSaved),
206 (SearchProject, SearchProjectResponse),
207 (SendChannelMessage, SendChannelMessageResponse),
208 (Test, Test),
209 (UnregisterProject, Ack),
210 (UpdateBuffer, Ack),
211 (UpdateWorktree, Ack),
212);
213
214entity_messages!(
215 project_id,
216 AddProjectCollaborator,
217 ApplyCodeAction,
218 ApplyCompletionAdditionalEdits,
219 BufferReloaded,
220 BufferSaved,
221 CopyProjectEntry,
222 CreateProjectEntry,
223 DeleteProjectEntry,
224 Follow,
225 FormatBuffers,
226 GetCodeActions,
227 GetCompletions,
228 GetDefinition,
229 GetDocumentHighlights,
230 GetHover,
231 GetReferences,
232 GetProjectSymbols,
233 JoinProject,
234 JoinProjectRequestCancelled,
235 LeaveProject,
236 OpenBufferById,
237 OpenBufferByPath,
238 OpenBufferForSymbol,
239 PerformRename,
240 PrepareRename,
241 ProjectUnshared,
242 RegisterProjectActivity,
243 ReloadBuffers,
244 RemoveProjectCollaborator,
245 RenameProjectEntry,
246 RequestJoinProject,
247 SaveBuffer,
248 SearchProject,
249 StartLanguageServer,
250 Unfollow,
251 UnregisterProject,
252 UpdateBuffer,
253 UpdateBufferFile,
254 UpdateDiagnosticSummary,
255 UpdateFollowers,
256 UpdateLanguageServer,
257 UpdateProject,
258 UpdateWorktree,
259 UpdateWorktreeExtensions,
260);
261
262entity_messages!(channel_id, ChannelMessageSent);
263
264const MAX_BUFFER_LEN: usize = 1 * 1024 * 1024;
265
266/// A stream of protobuf messages.
267pub struct MessageStream<S> {
268 stream: S,
269 encoding_buffer: Vec<u8>,
270}
271
272#[derive(Debug)]
273pub enum Message {
274 Envelope(Envelope),
275 Ping,
276 Pong,
277}
278
279impl<S> MessageStream<S> {
280 pub fn new(stream: S) -> Self {
281 Self {
282 stream,
283 encoding_buffer: Vec::new(),
284 }
285 }
286
287 pub fn inner_mut(&mut self) -> &mut S {
288 &mut self.stream
289 }
290}
291
292impl<S> MessageStream<S>
293where
294 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
295{
296 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
297 #[cfg(any(test, feature = "test-support"))]
298 const COMPRESSION_LEVEL: i32 = -7;
299
300 #[cfg(not(any(test, feature = "test-support")))]
301 const COMPRESSION_LEVEL: i32 = 4;
302
303 match message {
304 Message::Envelope(message) => {
305 self.encoding_buffer.reserve(message.encoded_len());
306 message
307 .encode(&mut self.encoding_buffer)
308 .map_err(|err| io::Error::from(err))?;
309 let buffer =
310 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
311 .unwrap();
312
313 self.encoding_buffer.clear();
314 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
315 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
316 }
317 Message::Ping => {
318 self.stream
319 .send(WebSocketMessage::Ping(Default::default()))
320 .await?;
321 }
322 Message::Pong => {
323 self.stream
324 .send(WebSocketMessage::Pong(Default::default()))
325 .await?;
326 }
327 }
328
329 Ok(())
330 }
331}
332
333impl<S> MessageStream<S>
334where
335 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
336{
337 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
338 while let Some(bytes) = self.stream.next().await {
339 match bytes? {
340 WebSocketMessage::Binary(bytes) => {
341 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
342 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
343 .map_err(io::Error::from)?;
344
345 self.encoding_buffer.clear();
346 self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
347 return Ok(Message::Envelope(envelope));
348 }
349 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
350 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
351 WebSocketMessage::Close(_) => break,
352 _ => {}
353 }
354 }
355 Err(anyhow!("connection closed"))
356 }
357}
358
359impl Into<SystemTime> for Timestamp {
360 fn into(self) -> SystemTime {
361 UNIX_EPOCH
362 .checked_add(Duration::new(self.seconds, self.nanos))
363 .unwrap()
364 }
365}
366
367impl From<SystemTime> for Timestamp {
368 fn from(time: SystemTime) -> Self {
369 let duration = time.duration_since(UNIX_EPOCH).unwrap();
370 Self {
371 seconds: duration.as_secs(),
372 nanos: duration.subsec_nanos(),
373 }
374 }
375}
376
377impl From<u128> for Nonce {
378 fn from(nonce: u128) -> Self {
379 let upper_half = (nonce >> 64) as u64;
380 let lower_half = nonce as u64;
381 Self {
382 upper_half,
383 lower_half,
384 }
385 }
386}
387
388impl From<Nonce> for u128 {
389 fn from(nonce: Nonce) -> Self {
390 let upper_half = (nonce.upper_half as u128) << 64;
391 let lower_half = nonce.lower_half as u128;
392 upper_half | lower_half
393 }
394}
395
396pub fn split_worktree_update(
397 mut message: UpdateWorktree,
398 max_chunk_size: usize,
399) -> impl Iterator<Item = UpdateWorktree> {
400 let mut done = false;
401 iter::from_fn(move || {
402 if done {
403 return None;
404 }
405
406 let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
407 let updated_entries = message.updated_entries.drain(..chunk_size).collect();
408 done = message.updated_entries.is_empty();
409 Some(UpdateWorktree {
410 project_id: message.project_id,
411 worktree_id: message.worktree_id,
412 root_name: message.root_name.clone(),
413 updated_entries,
414 removed_entries: mem::take(&mut message.removed_entries),
415 scan_id: message.scan_id,
416 is_last_update: done && message.is_last_update,
417 })
418 })
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424
425 #[gpui::test]
426 async fn test_buffer_size() {
427 let (tx, rx) = futures::channel::mpsc::unbounded();
428 let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
429 sink.write(Message::Envelope(Envelope {
430 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
431 root_name: "abcdefg".repeat(10),
432 ..Default::default()
433 })),
434 ..Default::default()
435 }))
436 .await
437 .unwrap();
438 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
439 sink.write(Message::Envelope(Envelope {
440 payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
441 root_name: "abcdefg".repeat(1000000),
442 ..Default::default()
443 })),
444 ..Default::default()
445 }))
446 .await
447 .unwrap();
448 assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
449
450 let mut stream = MessageStream::new(rx.map(|msg| anyhow::Ok(msg)));
451 stream.read().await.unwrap();
452 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
453 stream.read().await.unwrap();
454 assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
455 }
456}