1use super::{ConnectionId, PeerId, TypedEnvelope};
2use anyhow::Result;
3use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message;
6use std::any::{Any, TypeId};
7use std::{
8 io,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
13
14pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
15 const NAME: &'static str;
16 const PRIORITY: MessagePriority;
17 fn into_envelope(
18 self,
19 id: u32,
20 responding_to: Option<u32>,
21 original_sender_id: Option<u32>,
22 ) -> Envelope;
23 fn from_envelope(envelope: Envelope) -> Option<Self>;
24}
25
26pub trait EntityMessage: EnvelopedMessage {
27 fn remote_entity_id(&self) -> u64;
28}
29
30pub trait RequestMessage: EnvelopedMessage {
31 type Response: EnvelopedMessage;
32}
33
34pub trait AnyTypedEnvelope: 'static + Send + Sync {
35 fn payload_type_id(&self) -> TypeId;
36 fn payload_type_name(&self) -> &'static str;
37 fn as_any(&self) -> &dyn Any;
38 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
39 fn is_background(&self) -> bool;
40 fn original_sender_id(&self) -> Option<PeerId>;
41}
42
43pub enum MessagePriority {
44 Foreground,
45 Background,
46}
47
48impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
49 fn payload_type_id(&self) -> TypeId {
50 TypeId::of::<T>()
51 }
52
53 fn payload_type_name(&self) -> &'static str {
54 T::NAME
55 }
56
57 fn as_any(&self) -> &dyn Any {
58 self
59 }
60
61 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
62 self
63 }
64
65 fn is_background(&self) -> bool {
66 matches!(T::PRIORITY, MessagePriority::Background)
67 }
68
69 fn original_sender_id(&self) -> Option<PeerId> {
70 self.original_sender_id
71 }
72}
73
74macro_rules! messages {
75 ($(($name:ident, $priority:ident)),* $(,)?) => {
76 pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
77 match envelope.payload {
78 $(Some(envelope::Payload::$name(payload)) => {
79 Some(Box::new(TypedEnvelope {
80 sender_id,
81 original_sender_id: envelope.original_sender_id.map(PeerId),
82 message_id: envelope.id,
83 payload,
84 }))
85 }, )*
86 _ => None
87 }
88 }
89
90 $(
91 impl EnvelopedMessage for $name {
92 const NAME: &'static str = std::stringify!($name);
93 const PRIORITY: MessagePriority = MessagePriority::$priority;
94
95 fn into_envelope(
96 self,
97 id: u32,
98 responding_to: Option<u32>,
99 original_sender_id: Option<u32>,
100 ) -> Envelope {
101 Envelope {
102 id,
103 responding_to,
104 original_sender_id,
105 payload: Some(envelope::Payload::$name(self)),
106 }
107 }
108
109 fn from_envelope(envelope: Envelope) -> Option<Self> {
110 if let Some(envelope::Payload::$name(msg)) = envelope.payload {
111 Some(msg)
112 } else {
113 None
114 }
115 }
116 }
117 )*
118 };
119}
120
121macro_rules! request_messages {
122 ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
123 $(impl RequestMessage for $request_name {
124 type Response = $response_name;
125 })*
126 };
127}
128
129macro_rules! entity_messages {
130 ($id_field:ident, $($name:ident),* $(,)?) => {
131 $(impl EntityMessage for $name {
132 fn remote_entity_id(&self) -> u64 {
133 self.$id_field
134 }
135 })*
136 };
137}
138
139messages!(
140 (Ack, Foreground),
141 (AddProjectCollaborator, Foreground),
142 (ApplyCodeAction, Foreground),
143 (ApplyCodeActionResponse, Foreground),
144 (ApplyCompletionAdditionalEdits, Foreground),
145 (ApplyCompletionAdditionalEditsResponse, Foreground),
146 (BufferReloaded, Foreground),
147 (BufferSaved, Foreground),
148 (ChannelMessageSent, Foreground),
149 (CloseBuffer, Foreground),
150 (DiskBasedDiagnosticsUpdated, Background),
151 (DiskBasedDiagnosticsUpdating, Background),
152 (Error, Foreground),
153 (FormatBuffers, Foreground),
154 (FormatBuffersResponse, Foreground),
155 (GetChannelMessages, Foreground),
156 (GetChannelMessagesResponse, Foreground),
157 (GetChannels, Foreground),
158 (GetChannelsResponse, Foreground),
159 (GetCodeActions, Background),
160 (GetCodeActionsResponse, Foreground),
161 (GetCompletions, Background),
162 (GetCompletionsResponse, Foreground),
163 (GetDefinition, Foreground),
164 (GetDefinitionResponse, Foreground),
165 (GetDocumentHighlights, Background),
166 (GetDocumentHighlightsResponse, Background),
167 (GetReferences, Foreground),
168 (GetReferencesResponse, Foreground),
169 (GetProjectSymbols, Background),
170 (GetProjectSymbolsResponse, Background),
171 (GetUsers, Foreground),
172 (GetUsersResponse, Foreground),
173 (JoinChannel, Foreground),
174 (JoinChannelResponse, Foreground),
175 (JoinProject, Foreground),
176 (JoinProjectResponse, Foreground),
177 (LeaveChannel, Foreground),
178 (LeaveProject, Foreground),
179 (OpenBuffer, Foreground),
180 (OpenBufferForSymbol, Foreground),
181 (OpenBufferForSymbolResponse, Foreground),
182 (OpenBufferResponse, Foreground),
183 (PerformRename, Background),
184 (PerformRenameResponse, Background),
185 (PrepareRename, Background),
186 (PrepareRenameResponse, Background),
187 (RegisterProjectResponse, Foreground),
188 (Ping, Foreground),
189 (RegisterProject, Foreground),
190 (RegisterWorktree, Foreground),
191 (RemoveProjectCollaborator, Foreground),
192 (SaveBuffer, Foreground),
193 (SearchProject, Foreground),
194 (SearchProjectResponse, Foreground),
195 (SendChannelMessage, Foreground),
196 (SendChannelMessageResponse, Foreground),
197 (ShareProject, Foreground),
198 (Test, Foreground),
199 (UnregisterProject, Foreground),
200 (UnregisterWorktree, Foreground),
201 (UnshareProject, Foreground),
202 (UpdateBuffer, Foreground),
203 (UpdateBufferFile, Foreground),
204 (UpdateContacts, Foreground),
205 (UpdateDiagnosticSummary, Foreground),
206 (UpdateWorktree, Foreground),
207);
208
209request_messages!(
210 (ApplyCodeAction, ApplyCodeActionResponse),
211 (
212 ApplyCompletionAdditionalEdits,
213 ApplyCompletionAdditionalEditsResponse
214 ),
215 (FormatBuffers, FormatBuffersResponse),
216 (GetChannelMessages, GetChannelMessagesResponse),
217 (GetChannels, GetChannelsResponse),
218 (GetCodeActions, GetCodeActionsResponse),
219 (GetCompletions, GetCompletionsResponse),
220 (GetDefinition, GetDefinitionResponse),
221 (GetDocumentHighlights, GetDocumentHighlightsResponse),
222 (GetReferences, GetReferencesResponse),
223 (GetProjectSymbols, GetProjectSymbolsResponse),
224 (GetUsers, GetUsersResponse),
225 (JoinChannel, JoinChannelResponse),
226 (JoinProject, JoinProjectResponse),
227 (OpenBuffer, OpenBufferResponse),
228 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
229 (Ping, Ack),
230 (PerformRename, PerformRenameResponse),
231 (PrepareRename, PrepareRenameResponse),
232 (RegisterProject, RegisterProjectResponse),
233 (RegisterWorktree, Ack),
234 (SaveBuffer, BufferSaved),
235 (SearchProject, SearchProjectResponse),
236 (SendChannelMessage, SendChannelMessageResponse),
237 (ShareProject, Ack),
238 (Test, Test),
239 (UpdateBuffer, Ack),
240 (UpdateWorktree, Ack),
241);
242
243entity_messages!(
244 project_id,
245 AddProjectCollaborator,
246 ApplyCodeAction,
247 ApplyCompletionAdditionalEdits,
248 BufferReloaded,
249 BufferSaved,
250 CloseBuffer,
251 DiskBasedDiagnosticsUpdated,
252 DiskBasedDiagnosticsUpdating,
253 FormatBuffers,
254 GetCodeActions,
255 GetCompletions,
256 GetDefinition,
257 GetDocumentHighlights,
258 GetReferences,
259 GetProjectSymbols,
260 JoinProject,
261 LeaveProject,
262 OpenBuffer,
263 OpenBufferForSymbol,
264 PerformRename,
265 PrepareRename,
266 RemoveProjectCollaborator,
267 SaveBuffer,
268 SearchProject,
269 UnregisterWorktree,
270 UnshareProject,
271 UpdateBuffer,
272 UpdateBufferFile,
273 UpdateDiagnosticSummary,
274 RegisterWorktree,
275 UpdateWorktree,
276);
277
278entity_messages!(channel_id, ChannelMessageSent);
279
280/// A stream of protobuf messages.
281pub struct MessageStream<S> {
282 stream: S,
283 encoding_buffer: Vec<u8>,
284}
285
286impl<S> MessageStream<S> {
287 pub fn new(stream: S) -> Self {
288 Self {
289 stream,
290 encoding_buffer: Vec::new(),
291 }
292 }
293
294 pub fn inner_mut(&mut self) -> &mut S {
295 &mut self.stream
296 }
297}
298
299impl<S> MessageStream<S>
300where
301 S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
302{
303 /// Write a given protobuf message to the stream.
304 pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
305 #[cfg(any(test, feature = "test-support"))]
306 const COMPRESSION_LEVEL: i32 = -7;
307
308 #[cfg(not(any(test, feature = "test-support")))]
309 const COMPRESSION_LEVEL: i32 = 4;
310
311 self.encoding_buffer.resize(message.encoded_len(), 0);
312 self.encoding_buffer.clear();
313 message
314 .encode(&mut self.encoding_buffer)
315 .map_err(|err| io::Error::from(err))?;
316 let buffer =
317 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL).unwrap();
318 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
319 Ok(())
320 }
321}
322
323impl<S> MessageStream<S>
324where
325 S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
326{
327 /// Read a protobuf message of the given type from the stream.
328 pub async fn read_message(&mut self) -> Result<Envelope, WebSocketError> {
329 while let Some(bytes) = self.stream.next().await {
330 match bytes? {
331 WebSocketMessage::Binary(bytes) => {
332 self.encoding_buffer.clear();
333 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
334 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
335 .map_err(io::Error::from)?;
336 return Ok(envelope);
337 }
338 WebSocketMessage::Close(_) => break,
339 _ => {}
340 }
341 }
342 Err(WebSocketError::ConnectionClosed)
343 }
344}
345
346impl Into<SystemTime> for Timestamp {
347 fn into(self) -> SystemTime {
348 UNIX_EPOCH
349 .checked_add(Duration::new(self.seconds, self.nanos))
350 .unwrap()
351 }
352}
353
354impl From<SystemTime> for Timestamp {
355 fn from(time: SystemTime) -> Self {
356 let duration = time.duration_since(UNIX_EPOCH).unwrap();
357 Self {
358 seconds: duration.as_secs(),
359 nanos: duration.subsec_nanos(),
360 }
361 }
362}
363
364impl From<u128> for Nonce {
365 fn from(nonce: u128) -> Self {
366 let upper_half = (nonce >> 64) as u64;
367 let lower_half = nonce as u64;
368 Self {
369 upper_half,
370 lower_half,
371 }
372 }
373}
374
375impl From<Nonce> for u128 {
376 fn from(nonce: Nonce) -> Self {
377 let upper_half = (nonce.upper_half as u128) << 64;
378 let lower_half = nonce.lower_half as u128;
379 upper_half | lower_half
380 }
381}