1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use async_tungstenite::tungstenite::Message as WebSocketMessage;
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message as _;
6use serde::Serialize;
7use std::any::{Any, TypeId};
8use std::{
9 fmt::Debug,
10 io,
11 time::{Duration, SystemTime, UNIX_EPOCH},
12};
13
14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
15
16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
17 const NAME: &'static str;
18 const PRIORITY: MessagePriority;
19 fn into_envelope(
20 self,
21 id: u32,
22 responding_to: Option<u32>,
23 original_sender_id: Option<u32>,
24 ) -> Envelope;
25 fn from_envelope(envelope: Envelope) -> Option<Self>;
26}
27
28pub trait EntityMessage: EnvelopedMessage {
29 fn remote_entity_id(&self) -> u64;
30}
31
32pub trait RequestMessage: EnvelopedMessage {
33 type Response: EnvelopedMessage;
34}
35
36pub trait AnyTypedEnvelope: 'static + Send + Sync {
37 fn payload_type_id(&self) -> TypeId;
38 fn payload_type_name(&self) -> &'static str;
39 fn as_any(&self) -> &dyn Any;
40 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
41 fn is_background(&self) -> bool;
42 fn original_sender_id(&self) -> Option<PeerId>;
43}
44
45pub enum MessagePriority {
46 Foreground,
47 Background,
48}
49
50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
51 fn payload_type_id(&self) -> TypeId {
52 TypeId::of::<T>()
53 }
54
55 fn payload_type_name(&self) -> &'static str {
56 T::NAME
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
64 self
65 }
66
67 fn is_background(&self) -> bool {
68 matches!(T::PRIORITY, MessagePriority::Background)
69 }
70
71 fn original_sender_id(&self) -> Option<PeerId> {
72 self.original_sender_id
73 }
74}
75
76messages!(
77 (Ack, Foreground),
78 (AddProjectCollaborator, Foreground),
79 (ApplyCodeAction, Background),
80 (ApplyCodeActionResponse, Background),
81 (ApplyCompletionAdditionalEdits, Background),
82 (ApplyCompletionAdditionalEditsResponse, Background),
83 (BufferReloaded, Foreground),
84 (BufferSaved, Foreground),
85 (RemoveContact, Foreground),
86 (ChannelMessageSent, Foreground),
87 (CreateProjectEntry, Foreground),
88 (DeleteProjectEntry, Foreground),
89 (Error, Foreground),
90 (Follow, Foreground),
91 (FollowResponse, Foreground),
92 (FormatBuffers, Foreground),
93 (FormatBuffersResponse, Foreground),
94 (FuzzySearchUsers, Foreground),
95 (GetChannelMessages, Foreground),
96 (GetChannelMessagesResponse, Foreground),
97 (GetChannels, Foreground),
98 (GetChannelsResponse, Foreground),
99 (GetCodeActions, Background),
100 (GetCodeActionsResponse, Background),
101 (GetCompletions, Background),
102 (GetCompletionsResponse, Background),
103 (GetDefinition, Background),
104 (GetDefinitionResponse, Background),
105 (GetDocumentHighlights, Background),
106 (GetDocumentHighlightsResponse, Background),
107 (GetReferences, Background),
108 (GetReferencesResponse, Background),
109 (GetProjectSymbols, Background),
110 (GetProjectSymbolsResponse, Background),
111 (GetUsers, Foreground),
112 (UsersResponse, Foreground),
113 (JoinChannel, Foreground),
114 (JoinChannelResponse, Foreground),
115 (JoinProject, Foreground),
116 (JoinProjectResponse, Foreground),
117 (JoinProjectRequestCancelled, Foreground),
118 (LeaveChannel, Foreground),
119 (LeaveProject, Foreground),
120 (OpenBufferById, Background),
121 (OpenBufferByPath, Background),
122 (OpenBufferForSymbol, Background),
123 (OpenBufferForSymbolResponse, Background),
124 (OpenBufferResponse, Background),
125 (PerformRename, Background),
126 (PerformRenameResponse, Background),
127 (PrepareRename, Background),
128 (PrepareRenameResponse, Background),
129 (ProjectEntryResponse, Foreground),
130 (RegisterProjectResponse, Foreground),
131 (Ping, Foreground),
132 (ProjectUnshared, Foreground),
133 (RegisterProject, Foreground),
134 (RegisterWorktree, Foreground),
135 (ReloadBuffers, Foreground),
136 (ReloadBuffersResponse, Foreground),
137 (RemoveProjectCollaborator, Foreground),
138 (RenameProjectEntry, Foreground),
139 (RequestContact, Foreground),
140 (RequestJoinProject, Foreground),
141 (RespondToContactRequest, Foreground),
142 (RespondToJoinProjectRequest, Foreground),
143 (SaveBuffer, Foreground),
144 (SearchProject, Background),
145 (SearchProjectResponse, Background),
146 (SendChannelMessage, Foreground),
147 (SendChannelMessageResponse, Foreground),
148 (StartLanguageServer, Foreground),
149 (Test, Foreground),
150 (Unfollow, Foreground),
151 (UnregisterProject, Foreground),
152 (UnregisterWorktree, Foreground),
153 (UpdateBuffer, Foreground),
154 (UpdateBufferFile, Foreground),
155 (UpdateContacts, Foreground),
156 (UpdateDiagnosticSummary, Foreground),
157 (UpdateFollowers, Foreground),
158 (UpdateLanguageServer, Foreground),
159 (UpdateWorktree, Foreground),
160);
161
162request_messages!(
163 (ApplyCodeAction, ApplyCodeActionResponse),
164 (
165 ApplyCompletionAdditionalEdits,
166 ApplyCompletionAdditionalEditsResponse
167 ),
168 (CreateProjectEntry, ProjectEntryResponse),
169 (DeleteProjectEntry, ProjectEntryResponse),
170 (Follow, FollowResponse),
171 (FormatBuffers, FormatBuffersResponse),
172 (GetChannelMessages, GetChannelMessagesResponse),
173 (GetChannels, GetChannelsResponse),
174 (GetCodeActions, GetCodeActionsResponse),
175 (GetCompletions, GetCompletionsResponse),
176 (GetDefinition, GetDefinitionResponse),
177 (GetDocumentHighlights, GetDocumentHighlightsResponse),
178 (GetReferences, GetReferencesResponse),
179 (GetProjectSymbols, GetProjectSymbolsResponse),
180 (FuzzySearchUsers, UsersResponse),
181 (GetUsers, UsersResponse),
182 (JoinChannel, JoinChannelResponse),
183 (JoinProject, JoinProjectResponse),
184 (OpenBufferById, OpenBufferResponse),
185 (OpenBufferByPath, OpenBufferResponse),
186 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
187 (Ping, Ack),
188 (PerformRename, PerformRenameResponse),
189 (PrepareRename, PrepareRenameResponse),
190 (RegisterProject, RegisterProjectResponse),
191 (RegisterWorktree, Ack),
192 (ReloadBuffers, ReloadBuffersResponse),
193 (RequestContact, Ack),
194 (RemoveContact, Ack),
195 (RespondToContactRequest, Ack),
196 (RenameProjectEntry, ProjectEntryResponse),
197 (SaveBuffer, BufferSaved),
198 (SearchProject, SearchProjectResponse),
199 (SendChannelMessage, SendChannelMessageResponse),
200 (Test, Test),
201 (UpdateBuffer, Ack),
202 (UpdateWorktree, Ack),
203);
204
205entity_messages!(
206 project_id,
207 AddProjectCollaborator,
208 ApplyCodeAction,
209 ApplyCompletionAdditionalEdits,
210 BufferReloaded,
211 BufferSaved,
212 CreateProjectEntry,
213 RenameProjectEntry,
214 DeleteProjectEntry,
215 Follow,
216 FormatBuffers,
217 GetCodeActions,
218 GetCompletions,
219 GetDefinition,
220 GetDocumentHighlights,
221 GetReferences,
222 GetProjectSymbols,
223 JoinProject,
224 JoinProjectRequestCancelled,
225 LeaveProject,
226 OpenBufferById,
227 OpenBufferByPath,
228 OpenBufferForSymbol,
229 PerformRename,
230 PrepareRename,
231 ProjectUnshared,
232 ReloadBuffers,
233 RemoveProjectCollaborator,
234 RequestJoinProject,
235 SaveBuffer,
236 SearchProject,
237 StartLanguageServer,
238 Unfollow,
239 UnregisterProject,
240 UnregisterWorktree,
241 UpdateBuffer,
242 UpdateBufferFile,
243 UpdateDiagnosticSummary,
244 UpdateFollowers,
245 UpdateLanguageServer,
246 RegisterWorktree,
247 UpdateWorktree,
248);
249
250entity_messages!(channel_id, ChannelMessageSent);
251
252/// A stream of protobuf messages.
253pub struct MessageStream<S> {
254 stream: S,
255 encoding_buffer: Vec<u8>,
256}
257
258#[derive(Debug)]
259pub enum Message {
260 Envelope(Envelope),
261 Ping,
262 Pong,
263}
264
265impl<S> MessageStream<S> {
266 pub fn new(stream: S) -> Self {
267 Self {
268 stream,
269 encoding_buffer: Vec::new(),
270 }
271 }
272
273 pub fn inner_mut(&mut self) -> &mut S {
274 &mut self.stream
275 }
276}
277
278impl<S> MessageStream<S>
279where
280 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
281{
282 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
283 #[cfg(any(test, feature = "test-support"))]
284 const COMPRESSION_LEVEL: i32 = -7;
285
286 #[cfg(not(any(test, feature = "test-support")))]
287 const COMPRESSION_LEVEL: i32 = 4;
288
289 match message {
290 Message::Envelope(message) => {
291 self.encoding_buffer.resize(message.encoded_len(), 0);
292 self.encoding_buffer.clear();
293 message
294 .encode(&mut self.encoding_buffer)
295 .map_err(|err| io::Error::from(err))?;
296 let buffer =
297 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
298 .unwrap();
299 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
300 }
301 Message::Ping => {
302 self.stream
303 .send(WebSocketMessage::Ping(Default::default()))
304 .await?;
305 }
306 Message::Pong => {
307 self.stream
308 .send(WebSocketMessage::Pong(Default::default()))
309 .await?;
310 }
311 }
312
313 Ok(())
314 }
315}
316
317impl<S> MessageStream<S>
318where
319 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
320{
321 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
322 while let Some(bytes) = self.stream.next().await {
323 match bytes? {
324 WebSocketMessage::Binary(bytes) => {
325 self.encoding_buffer.clear();
326 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
327 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
328 .map_err(io::Error::from)?;
329 return Ok(Message::Envelope(envelope));
330 }
331 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
332 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
333 WebSocketMessage::Close(_) => break,
334 _ => {}
335 }
336 }
337 Err(anyhow!("connection closed"))
338 }
339}
340
341impl Into<SystemTime> for Timestamp {
342 fn into(self) -> SystemTime {
343 UNIX_EPOCH
344 .checked_add(Duration::new(self.seconds, self.nanos))
345 .unwrap()
346 }
347}
348
349impl From<SystemTime> for Timestamp {
350 fn from(time: SystemTime) -> Self {
351 let duration = time.duration_since(UNIX_EPOCH).unwrap();
352 Self {
353 seconds: duration.as_secs(),
354 nanos: duration.subsec_nanos(),
355 }
356 }
357}
358
359impl From<u128> for Nonce {
360 fn from(nonce: u128) -> Self {
361 let upper_half = (nonce >> 64) as u64;
362 let lower_half = nonce as u64;
363 Self {
364 upper_half,
365 lower_half,
366 }
367 }
368}
369
370impl From<Nonce> for u128 {
371 fn from(nonce: Nonce) -> Self {
372 let upper_half = (nonce.upper_half as u128) << 64;
373 let lower_half = nonce.lower_half as u128;
374 upper_half | lower_half
375 }
376}