1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use async_tungstenite::tungstenite::Message as WebSocketMessage;
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message as _;
6use serde::Serialize;
7use std::any::{Any, TypeId};
8use std::{
9 fmt::Debug,
10 io,
11 time::{Duration, SystemTime, UNIX_EPOCH},
12};
13
14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
15
16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
17 const NAME: &'static str;
18 const PRIORITY: MessagePriority;
19 fn into_envelope(
20 self,
21 id: u32,
22 responding_to: Option<u32>,
23 original_sender_id: Option<u32>,
24 ) -> Envelope;
25 fn from_envelope(envelope: Envelope) -> Option<Self>;
26}
27
28pub trait EntityMessage: EnvelopedMessage {
29 fn remote_entity_id(&self) -> u64;
30}
31
32pub trait RequestMessage: EnvelopedMessage {
33 type Response: EnvelopedMessage;
34}
35
36pub trait AnyTypedEnvelope: 'static + Send + Sync {
37 fn payload_type_id(&self) -> TypeId;
38 fn payload_type_name(&self) -> &'static str;
39 fn as_any(&self) -> &dyn Any;
40 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
41 fn is_background(&self) -> bool;
42 fn original_sender_id(&self) -> Option<PeerId>;
43}
44
45pub enum MessagePriority {
46 Foreground,
47 Background,
48}
49
50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
51 fn payload_type_id(&self) -> TypeId {
52 TypeId::of::<T>()
53 }
54
55 fn payload_type_name(&self) -> &'static str {
56 T::NAME
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
64 self
65 }
66
67 fn is_background(&self) -> bool {
68 matches!(T::PRIORITY, MessagePriority::Background)
69 }
70
71 fn original_sender_id(&self) -> Option<PeerId> {
72 self.original_sender_id
73 }
74}
75
76messages!(
77 (Ack, Foreground),
78 (AddProjectCollaborator, Foreground),
79 (ApplyCodeAction, Background),
80 (ApplyCodeActionResponse, Background),
81 (ApplyCompletionAdditionalEdits, Background),
82 (ApplyCompletionAdditionalEditsResponse, Background),
83 (BufferReloaded, Foreground),
84 (BufferSaved, Foreground),
85 (RemoveContact, Foreground),
86 (ChannelMessageSent, Foreground),
87 (CreateProjectEntry, Foreground),
88 (DeleteProjectEntry, Foreground),
89 (Error, Foreground),
90 (Follow, Foreground),
91 (FollowResponse, Foreground),
92 (FormatBuffers, Foreground),
93 (FormatBuffersResponse, Foreground),
94 (FuzzySearchUsers, Foreground),
95 (GetChannelMessages, Foreground),
96 (GetChannelMessagesResponse, Foreground),
97 (GetChannels, Foreground),
98 (GetChannelsResponse, Foreground),
99 (GetCodeActions, Background),
100 (GetCodeActionsResponse, Background),
101 (GetCompletions, Background),
102 (GetCompletionsResponse, Background),
103 (GetDefinition, Background),
104 (GetDefinitionResponse, Background),
105 (GetDocumentHighlights, Background),
106 (GetDocumentHighlightsResponse, Background),
107 (GetReferences, Background),
108 (GetReferencesResponse, Background),
109 (GetProjectSymbols, Background),
110 (GetProjectSymbolsResponse, Background),
111 (GetUsers, Foreground),
112 (UsersResponse, Foreground),
113 (JoinChannel, Foreground),
114 (JoinChannelResponse, Foreground),
115 (JoinProject, Foreground),
116 (JoinProjectResponse, Foreground),
117 (LeaveChannel, Foreground),
118 (LeaveProject, Foreground),
119 (OpenBufferById, Background),
120 (OpenBufferByPath, Background),
121 (OpenBufferForSymbol, Background),
122 (OpenBufferForSymbolResponse, Background),
123 (OpenBufferResponse, Background),
124 (PerformRename, Background),
125 (PerformRenameResponse, Background),
126 (PrepareRename, Background),
127 (PrepareRenameResponse, Background),
128 (ProjectEntryResponse, Foreground),
129 (RegisterProjectResponse, Foreground),
130 (Ping, Foreground),
131 (ProjectUnshared, Foreground),
132 (RegisterProject, Foreground),
133 (RegisterWorktree, Foreground),
134 (ReloadBuffers, Foreground),
135 (ReloadBuffersResponse, Foreground),
136 (RemoveProjectCollaborator, Foreground),
137 (RenameProjectEntry, Foreground),
138 (RequestContact, Foreground),
139 (RequestJoinProject, Foreground),
140 (RespondToContactRequest, Foreground),
141 (RespondToJoinProjectRequest, Foreground),
142 (SaveBuffer, Foreground),
143 (SearchProject, Background),
144 (SearchProjectResponse, Background),
145 (SendChannelMessage, Foreground),
146 (SendChannelMessageResponse, Foreground),
147 (StartLanguageServer, Foreground),
148 (Test, Foreground),
149 (Unfollow, Foreground),
150 (UnregisterProject, Foreground),
151 (UnregisterWorktree, Foreground),
152 (UpdateBuffer, Foreground),
153 (UpdateBufferFile, Foreground),
154 (UpdateContacts, Foreground),
155 (UpdateDiagnosticSummary, Foreground),
156 (UpdateFollowers, Foreground),
157 (UpdateLanguageServer, Foreground),
158 (UpdateWorktree, Foreground),
159);
160
161request_messages!(
162 (ApplyCodeAction, ApplyCodeActionResponse),
163 (
164 ApplyCompletionAdditionalEdits,
165 ApplyCompletionAdditionalEditsResponse
166 ),
167 (CreateProjectEntry, ProjectEntryResponse),
168 (DeleteProjectEntry, ProjectEntryResponse),
169 (Follow, FollowResponse),
170 (FormatBuffers, FormatBuffersResponse),
171 (GetChannelMessages, GetChannelMessagesResponse),
172 (GetChannels, GetChannelsResponse),
173 (GetCodeActions, GetCodeActionsResponse),
174 (GetCompletions, GetCompletionsResponse),
175 (GetDefinition, GetDefinitionResponse),
176 (GetDocumentHighlights, GetDocumentHighlightsResponse),
177 (GetReferences, GetReferencesResponse),
178 (GetProjectSymbols, GetProjectSymbolsResponse),
179 (FuzzySearchUsers, UsersResponse),
180 (GetUsers, UsersResponse),
181 (JoinChannel, JoinChannelResponse),
182 (JoinProject, JoinProjectResponse),
183 (OpenBufferById, OpenBufferResponse),
184 (OpenBufferByPath, OpenBufferResponse),
185 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
186 (Ping, Ack),
187 (PerformRename, PerformRenameResponse),
188 (PrepareRename, PrepareRenameResponse),
189 (RegisterProject, RegisterProjectResponse),
190 (RegisterWorktree, Ack),
191 (ReloadBuffers, ReloadBuffersResponse),
192 (RequestContact, Ack),
193 (RemoveContact, Ack),
194 (RespondToContactRequest, Ack),
195 (RenameProjectEntry, ProjectEntryResponse),
196 (SaveBuffer, BufferSaved),
197 (SearchProject, SearchProjectResponse),
198 (SendChannelMessage, SendChannelMessageResponse),
199 (Test, Test),
200 (UpdateBuffer, Ack),
201 (UpdateWorktree, Ack),
202);
203
204entity_messages!(
205 project_id,
206 AddProjectCollaborator,
207 ApplyCodeAction,
208 ApplyCompletionAdditionalEdits,
209 BufferReloaded,
210 BufferSaved,
211 CreateProjectEntry,
212 RenameProjectEntry,
213 DeleteProjectEntry,
214 Follow,
215 FormatBuffers,
216 GetCodeActions,
217 GetCompletions,
218 GetDefinition,
219 GetDocumentHighlights,
220 GetReferences,
221 GetProjectSymbols,
222 JoinProject,
223 LeaveProject,
224 OpenBufferById,
225 OpenBufferByPath,
226 OpenBufferForSymbol,
227 PerformRename,
228 PrepareRename,
229 ProjectUnshared,
230 ReloadBuffers,
231 RemoveProjectCollaborator,
232 RequestJoinProject,
233 SaveBuffer,
234 SearchProject,
235 StartLanguageServer,
236 Unfollow,
237 UnregisterProject,
238 UnregisterWorktree,
239 UpdateBuffer,
240 UpdateBufferFile,
241 UpdateDiagnosticSummary,
242 UpdateFollowers,
243 UpdateLanguageServer,
244 RegisterWorktree,
245 UpdateWorktree,
246);
247
248entity_messages!(channel_id, ChannelMessageSent);
249
250/// A stream of protobuf messages.
251pub struct MessageStream<S> {
252 stream: S,
253 encoding_buffer: Vec<u8>,
254}
255
256#[derive(Debug)]
257pub enum Message {
258 Envelope(Envelope),
259 Ping,
260 Pong,
261}
262
263impl<S> MessageStream<S> {
264 pub fn new(stream: S) -> Self {
265 Self {
266 stream,
267 encoding_buffer: Vec::new(),
268 }
269 }
270
271 pub fn inner_mut(&mut self) -> &mut S {
272 &mut self.stream
273 }
274}
275
276impl<S> MessageStream<S>
277where
278 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
279{
280 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
281 #[cfg(any(test, feature = "test-support"))]
282 const COMPRESSION_LEVEL: i32 = -7;
283
284 #[cfg(not(any(test, feature = "test-support")))]
285 const COMPRESSION_LEVEL: i32 = 4;
286
287 match message {
288 Message::Envelope(message) => {
289 self.encoding_buffer.resize(message.encoded_len(), 0);
290 self.encoding_buffer.clear();
291 message
292 .encode(&mut self.encoding_buffer)
293 .map_err(|err| io::Error::from(err))?;
294 let buffer =
295 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
296 .unwrap();
297 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
298 }
299 Message::Ping => {
300 self.stream
301 .send(WebSocketMessage::Ping(Default::default()))
302 .await?;
303 }
304 Message::Pong => {
305 self.stream
306 .send(WebSocketMessage::Pong(Default::default()))
307 .await?;
308 }
309 }
310
311 Ok(())
312 }
313}
314
315impl<S> MessageStream<S>
316where
317 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
318{
319 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
320 while let Some(bytes) = self.stream.next().await {
321 match bytes? {
322 WebSocketMessage::Binary(bytes) => {
323 self.encoding_buffer.clear();
324 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
325 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
326 .map_err(io::Error::from)?;
327 return Ok(Message::Envelope(envelope));
328 }
329 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
330 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
331 WebSocketMessage::Close(_) => break,
332 _ => {}
333 }
334 }
335 Err(anyhow!("connection closed"))
336 }
337}
338
339impl Into<SystemTime> for Timestamp {
340 fn into(self) -> SystemTime {
341 UNIX_EPOCH
342 .checked_add(Duration::new(self.seconds, self.nanos))
343 .unwrap()
344 }
345}
346
347impl From<SystemTime> for Timestamp {
348 fn from(time: SystemTime) -> Self {
349 let duration = time.duration_since(UNIX_EPOCH).unwrap();
350 Self {
351 seconds: duration.as_secs(),
352 nanos: duration.subsec_nanos(),
353 }
354 }
355}
356
357impl From<u128> for Nonce {
358 fn from(nonce: u128) -> Self {
359 let upper_half = (nonce >> 64) as u64;
360 let lower_half = nonce as u64;
361 Self {
362 upper_half,
363 lower_half,
364 }
365 }
366}
367
368impl From<Nonce> for u128 {
369 fn from(nonce: Nonce) -> Self {
370 let upper_half = (nonce.upper_half as u128) << 64;
371 let lower_half = nonce.lower_half as u128;
372 upper_half | lower_half
373 }
374}