1use super::{ConnectionId, PeerId, TypedEnvelope};
2use anyhow::Result;
3use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message;
6use std::any::{Any, TypeId};
7use std::{
8 io,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
13
14pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
15 const NAME: &'static str;
16 const PRIORITY: MessagePriority;
17 fn into_envelope(
18 self,
19 id: u32,
20 responding_to: Option<u32>,
21 original_sender_id: Option<u32>,
22 ) -> Envelope;
23 fn from_envelope(envelope: Envelope) -> Option<Self>;
24}
25
26pub trait EntityMessage: EnvelopedMessage {
27 fn remote_entity_id(&self) -> u64;
28}
29
30pub trait RequestMessage: EnvelopedMessage {
31 type Response: EnvelopedMessage;
32}
33
34pub trait AnyTypedEnvelope: 'static + Send + Sync {
35 fn payload_type_id(&self) -> TypeId;
36 fn payload_type_name(&self) -> &'static str;
37 fn as_any(&self) -> &dyn Any;
38 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
39 fn is_background(&self) -> bool;
40 fn original_sender_id(&self) -> Option<PeerId>;
41}
42
43pub enum MessagePriority {
44 Foreground,
45 Background,
46}
47
48impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
49 fn payload_type_id(&self) -> TypeId {
50 TypeId::of::<T>()
51 }
52
53 fn payload_type_name(&self) -> &'static str {
54 T::NAME
55 }
56
57 fn as_any(&self) -> &dyn Any {
58 self
59 }
60
61 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
62 self
63 }
64
65 fn is_background(&self) -> bool {
66 matches!(T::PRIORITY, MessagePriority::Background)
67 }
68
69 fn original_sender_id(&self) -> Option<PeerId> {
70 self.original_sender_id
71 }
72}
73
74macro_rules! messages {
75 ($(($name:ident, $priority:ident)),* $(,)?) => {
76 pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
77 match envelope.payload {
78 $(Some(envelope::Payload::$name(payload)) => {
79 Some(Box::new(TypedEnvelope {
80 sender_id,
81 original_sender_id: envelope.original_sender_id.map(PeerId),
82 message_id: envelope.id,
83 payload,
84 }))
85 }, )*
86 _ => None
87 }
88 }
89
90 $(
91 impl EnvelopedMessage for $name {
92 const NAME: &'static str = std::stringify!($name);
93 const PRIORITY: MessagePriority = MessagePriority::$priority;
94
95 fn into_envelope(
96 self,
97 id: u32,
98 responding_to: Option<u32>,
99 original_sender_id: Option<u32>,
100 ) -> Envelope {
101 Envelope {
102 id,
103 responding_to,
104 original_sender_id,
105 payload: Some(envelope::Payload::$name(self)),
106 }
107 }
108
109 fn from_envelope(envelope: Envelope) -> Option<Self> {
110 if let Some(envelope::Payload::$name(msg)) = envelope.payload {
111 Some(msg)
112 } else {
113 None
114 }
115 }
116 }
117 )*
118 };
119}
120
121macro_rules! request_messages {
122 ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
123 $(impl RequestMessage for $request_name {
124 type Response = $response_name;
125 })*
126 };
127}
128
129macro_rules! entity_messages {
130 ($id_field:ident, $($name:ident),* $(,)?) => {
131 $(impl EntityMessage for $name {
132 fn remote_entity_id(&self) -> u64 {
133 self.$id_field
134 }
135 })*
136 };
137}
138
139messages!(
140 (Ack, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Foreground),
143 (ApplyCodeActionResponse, Foreground),
144 (ApplyCompletionAdditionalEdits, Foreground),
145 (ApplyCompletionAdditionalEditsResponse, Foreground),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (ChannelMessageSent, Foreground),
149 (CloseBuffer, Foreground),
150 (DiskBasedDiagnosticsUpdated, Background),
151 (DiskBasedDiagnosticsUpdating, Background),
152 (Error, Foreground),
153 (FormatBuffers, Foreground),
154 (FormatBuffersResponse, Foreground),
155 (GetChannelMessages, Foreground),
156 (GetChannelMessagesResponse, Foreground),
157 (GetChannels, Foreground),
158 (GetChannelsResponse, Foreground),
159 (GetCodeActions, Background),
160 (GetCodeActionsResponse, Foreground),
161 (GetCompletions, Background),
162 (GetCompletionsResponse, Foreground),
163 (GetDefinition, Foreground),
164 (GetDefinitionResponse, Foreground),
165 (GetDocumentHighlights, Background),
166 (GetDocumentHighlightsResponse, Background),
167 (GetReferences, Foreground),
168 (GetReferencesResponse, Foreground),
169 (GetProjectSymbols, Background),
170 (GetProjectSymbolsResponse, Background),
171 (GetUsers, Foreground),
172 (GetUsersResponse, Foreground),
173 (JoinChannel, Foreground),
174 (JoinChannelResponse, Foreground),
175 (JoinProject, Foreground),
176 (JoinProjectResponse, Foreground),
177 (LeaveChannel, Foreground),
178 (LeaveProject, Foreground),
179 (OpenBuffer, Foreground),
180 (OpenBufferForSymbol, Foreground),
181 (OpenBufferForSymbolResponse, Foreground),
182 (OpenBufferResponse, Foreground),
183 (PerformRename, Background),
184 (PerformRenameResponse, Background),
185 (PrepareRename, Background),
186 (PrepareRenameResponse, Background),
187 (RegisterProjectResponse, Foreground),
188 (Ping, Foreground),
189 (RegisterProject, Foreground),
190 (RegisterWorktree, Foreground),
191 (RemoveProjectCollaborator, Foreground),
192 (SaveBuffer, Foreground),
193 (SendChannelMessage, Foreground),
194 (SendChannelMessageResponse, Foreground),
195 (ShareProject, Foreground),
196 (Test, Foreground),
197 (UnregisterProject, Foreground),
198 (UnregisterWorktree, Foreground),
199 (UnshareProject, Foreground),
200 (UpdateBuffer, Foreground),
201 (UpdateBufferFile, Foreground),
202 (UpdateContacts, Foreground),
203 (UpdateDiagnosticSummary, Foreground),
204 (UpdateWorktree, Foreground),
205);
206
207request_messages!(
208 (ApplyCodeAction, ApplyCodeActionResponse),
209 (
210 ApplyCompletionAdditionalEdits,
211 ApplyCompletionAdditionalEditsResponse
212 ),
213 (FormatBuffers, FormatBuffersResponse),
214 (GetChannelMessages, GetChannelMessagesResponse),
215 (GetChannels, GetChannelsResponse),
216 (GetCodeActions, GetCodeActionsResponse),
217 (GetCompletions, GetCompletionsResponse),
218 (GetDefinition, GetDefinitionResponse),
219 (GetDocumentHighlights, GetDocumentHighlightsResponse),
220 (GetReferences, GetReferencesResponse),
221 (GetProjectSymbols, GetProjectSymbolsResponse),
222 (GetUsers, GetUsersResponse),
223 (JoinChannel, JoinChannelResponse),
224 (JoinProject, JoinProjectResponse),
225 (OpenBuffer, OpenBufferResponse),
226 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
227 (Ping, Ack),
228 (PerformRename, PerformRenameResponse),
229 (PrepareRename, PrepareRenameResponse),
230 (RegisterProject, RegisterProjectResponse),
231 (RegisterWorktree, Ack),
232 (SaveBuffer, BufferSaved),
233 (SendChannelMessage, SendChannelMessageResponse),
234 (ShareProject, Ack),
235 (Test, Test),
236 (UpdateBuffer, Ack),
237 (UpdateWorktree, Ack),
238);
239
240entity_messages!(
241 project_id,
242 AddProjectCollaborator,
243 ApplyCodeAction,
244 ApplyCompletionAdditionalEdits,
245 BufferReloaded,
246 BufferSaved,
247 CloseBuffer,
248 DiskBasedDiagnosticsUpdated,
249 DiskBasedDiagnosticsUpdating,
250 FormatBuffers,
251 GetCodeActions,
252 GetCompletions,
253 GetDefinition,
254 GetDocumentHighlights,
255 GetReferences,
256 GetProjectSymbols,
257 JoinProject,
258 LeaveProject,
259 OpenBuffer,
260 OpenBufferForSymbol,
261 PerformRename,
262 PrepareRename,
263 RemoveProjectCollaborator,
264 SaveBuffer,
265 UnregisterWorktree,
266 UnshareProject,
267 UpdateBuffer,
268 UpdateBufferFile,
269 UpdateDiagnosticSummary,
270 RegisterWorktree,
271 UpdateWorktree,
272);
273
274entity_messages!(channel_id, ChannelMessageSent);
275
276/// A stream of protobuf messages.
277pub struct MessageStream<S> {
278 stream: S,
279 encoding_buffer: Vec<u8>,
280}
281
282impl<S> MessageStream<S> {
283 pub fn new(stream: S) -> Self {
284 Self {
285 stream,
286 encoding_buffer: Vec::new(),
287 }
288 }
289
290 pub fn inner_mut(&mut self) -> &mut S {
291 &mut self.stream
292 }
293}
294
295impl<S> MessageStream<S>
296where
297 S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
298{
299 /// Write a given protobuf message to the stream.
300 pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
301 #[cfg(any(test, feature = "test-support"))]
302 const COMPRESSION_LEVEL: i32 = -7;
303
304 #[cfg(not(any(test, feature = "test-support")))]
305 const COMPRESSION_LEVEL: i32 = 4;
306
307 self.encoding_buffer.resize(message.encoded_len(), 0);
308 self.encoding_buffer.clear();
309 message
310 .encode(&mut self.encoding_buffer)
311 .map_err(|err| io::Error::from(err))?;
312 let buffer =
313 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL).unwrap();
314 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
315 Ok(())
316 }
317}
318
319impl<S> MessageStream<S>
320where
321 S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
322{
323 /// Read a protobuf message of the given type from the stream.
324 pub async fn read_message(&mut self) -> Result<Envelope, WebSocketError> {
325 while let Some(bytes) = self.stream.next().await {
326 match bytes? {
327 WebSocketMessage::Binary(bytes) => {
328 self.encoding_buffer.clear();
329 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
330 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
331 .map_err(io::Error::from)?;
332 return Ok(envelope);
333 }
334 WebSocketMessage::Close(_) => break,
335 _ => {}
336 }
337 }
338 Err(WebSocketError::ConnectionClosed)
339 }
340}
341
342impl Into<SystemTime> for Timestamp {
343 fn into(self) -> SystemTime {
344 UNIX_EPOCH
345 .checked_add(Duration::new(self.seconds, self.nanos))
346 .unwrap()
347 }
348}
349
350impl From<SystemTime> for Timestamp {
351 fn from(time: SystemTime) -> Self {
352 let duration = time.duration_since(UNIX_EPOCH).unwrap();
353 Self {
354 seconds: duration.as_secs(),
355 nanos: duration.subsec_nanos(),
356 }
357 }
358}
359
360impl From<u128> for Nonce {
361 fn from(nonce: u128) -> Self {
362 let upper_half = (nonce >> 64) as u64;
363 let lower_half = nonce as u64;
364 Self {
365 upper_half,
366 lower_half,
367 }
368 }
369}
370
371impl From<Nonce> for u128 {
372 fn from(nonce: Nonce) -> Self {
373 let upper_half = (nonce.upper_half as u128) << 64;
374 let lower_half = nonce.lower_half as u128;
375 upper_half | lower_half
376 }
377}