1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use async_tungstenite::tungstenite::Message as WebSocketMessage;
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message as _;
6use serde::Serialize;
7use std::any::{Any, TypeId};
8use std::{
9 fmt::Debug,
10 io,
11 time::{Duration, SystemTime, UNIX_EPOCH},
12};
13
14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
15
16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
17 const NAME: &'static str;
18 const PRIORITY: MessagePriority;
19 fn into_envelope(
20 self,
21 id: u32,
22 responding_to: Option<u32>,
23 original_sender_id: Option<u32>,
24 ) -> Envelope;
25 fn from_envelope(envelope: Envelope) -> Option<Self>;
26}
27
28pub trait EntityMessage: EnvelopedMessage {
29 fn remote_entity_id(&self) -> u64;
30}
31
32pub trait RequestMessage: EnvelopedMessage {
33 type Response: EnvelopedMessage;
34}
35
36pub trait AnyTypedEnvelope: 'static + Send + Sync {
37 fn payload_type_id(&self) -> TypeId;
38 fn payload_type_name(&self) -> &'static str;
39 fn as_any(&self) -> &dyn Any;
40 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
41 fn is_background(&self) -> bool;
42 fn original_sender_id(&self) -> Option<PeerId>;
43}
44
45pub enum MessagePriority {
46 Foreground,
47 Background,
48}
49
50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
51 fn payload_type_id(&self) -> TypeId {
52 TypeId::of::<T>()
53 }
54
55 fn payload_type_name(&self) -> &'static str {
56 T::NAME
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
64 self
65 }
66
67 fn is_background(&self) -> bool {
68 matches!(T::PRIORITY, MessagePriority::Background)
69 }
70
71 fn original_sender_id(&self) -> Option<PeerId> {
72 self.original_sender_id
73 }
74}
75
76messages!(
77 (Ack, Foreground),
78 (AddProjectCollaborator, Foreground),
79 (ApplyCodeAction, Background),
80 (ApplyCodeActionResponse, Background),
81 (ApplyCompletionAdditionalEdits, Background),
82 (ApplyCompletionAdditionalEditsResponse, Background),
83 (BufferReloaded, Foreground),
84 (BufferSaved, Foreground),
85 (RemoveContact, Foreground),
86 (ChannelMessageSent, Foreground),
87 (CreateProjectEntry, Foreground),
88 (DeleteProjectEntry, Foreground),
89 (Error, Foreground),
90 (Follow, Foreground),
91 (FollowResponse, Foreground),
92 (FormatBuffers, Foreground),
93 (FormatBuffersResponse, Foreground),
94 (FuzzySearchUsers, Foreground),
95 (GetChannelMessages, Foreground),
96 (GetChannelMessagesResponse, Foreground),
97 (GetChannels, Foreground),
98 (GetChannelsResponse, Foreground),
99 (GetCodeActions, Background),
100 (GetCodeActionsResponse, Background),
101 (GetCompletions, Background),
102 (GetCompletionsResponse, Background),
103 (GetDefinition, Background),
104 (GetDefinitionResponse, Background),
105 (GetDocumentHighlights, Background),
106 (GetDocumentHighlightsResponse, Background),
107 (GetReferences, Background),
108 (GetReferencesResponse, Background),
109 (GetProjectSymbols, Background),
110 (GetProjectSymbolsResponse, Background),
111 (GetUsers, Foreground),
112 (UsersResponse, Foreground),
113 (JoinChannel, Foreground),
114 (JoinChannelResponse, Foreground),
115 (JoinProject, Foreground),
116 (JoinProjectResponse, Foreground),
117 (JoinProjectRequestCancelled, Foreground),
118 (LeaveChannel, Foreground),
119 (LeaveProject, Foreground),
120 (OpenBufferById, Background),
121 (OpenBufferByPath, Background),
122 (OpenBufferForSymbol, Background),
123 (OpenBufferForSymbolResponse, Background),
124 (OpenBufferResponse, Background),
125 (PerformRename, Background),
126 (PerformRenameResponse, Background),
127 (PrepareRename, Background),
128 (PrepareRenameResponse, Background),
129 (ProjectEntryResponse, Foreground),
130 (RegisterProjectResponse, Foreground),
131 (Ping, Foreground),
132 (ProjectUnshared, Foreground),
133 (RegisterProject, Foreground),
134 (RegisterWorktree, Foreground),
135 (ReloadBuffers, Foreground),
136 (ReloadBuffersResponse, Foreground),
137 (RemoveProjectCollaborator, Foreground),
138 (RenameProjectEntry, Foreground),
139 (RequestContact, Foreground),
140 (RequestJoinProject, Foreground),
141 (RespondToContactRequest, Foreground),
142 (RespondToJoinProjectRequest, Foreground),
143 (SaveBuffer, Foreground),
144 (SearchProject, Background),
145 (SearchProjectResponse, Background),
146 (SendChannelMessage, Foreground),
147 (SendChannelMessageResponse, Foreground),
148 (ShowContacts, Foreground),
149 (StartLanguageServer, Foreground),
150 (Test, Foreground),
151 (Unfollow, Foreground),
152 (UnregisterProject, Foreground),
153 (UnregisterWorktree, Foreground),
154 (UpdateBuffer, Foreground),
155 (UpdateBufferFile, Foreground),
156 (UpdateContacts, Foreground),
157 (UpdateDiagnosticSummary, Foreground),
158 (UpdateFollowers, Foreground),
159 (UpdateInviteInfo, Foreground),
160 (UpdateLanguageServer, Foreground),
161 (UpdateWorktree, Foreground),
162);
163
164request_messages!(
165 (ApplyCodeAction, ApplyCodeActionResponse),
166 (
167 ApplyCompletionAdditionalEdits,
168 ApplyCompletionAdditionalEditsResponse
169 ),
170 (CreateProjectEntry, ProjectEntryResponse),
171 (DeleteProjectEntry, ProjectEntryResponse),
172 (Follow, FollowResponse),
173 (FormatBuffers, FormatBuffersResponse),
174 (GetChannelMessages, GetChannelMessagesResponse),
175 (GetChannels, GetChannelsResponse),
176 (GetCodeActions, GetCodeActionsResponse),
177 (GetCompletions, GetCompletionsResponse),
178 (GetDefinition, GetDefinitionResponse),
179 (GetDocumentHighlights, GetDocumentHighlightsResponse),
180 (GetReferences, GetReferencesResponse),
181 (GetProjectSymbols, GetProjectSymbolsResponse),
182 (FuzzySearchUsers, UsersResponse),
183 (GetUsers, UsersResponse),
184 (JoinChannel, JoinChannelResponse),
185 (JoinProject, JoinProjectResponse),
186 (OpenBufferById, OpenBufferResponse),
187 (OpenBufferByPath, OpenBufferResponse),
188 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
189 (Ping, Ack),
190 (PerformRename, PerformRenameResponse),
191 (PrepareRename, PrepareRenameResponse),
192 (RegisterProject, RegisterProjectResponse),
193 (RegisterWorktree, Ack),
194 (ReloadBuffers, ReloadBuffersResponse),
195 (RequestContact, Ack),
196 (RemoveContact, Ack),
197 (RespondToContactRequest, Ack),
198 (RenameProjectEntry, ProjectEntryResponse),
199 (SaveBuffer, BufferSaved),
200 (SearchProject, SearchProjectResponse),
201 (SendChannelMessage, SendChannelMessageResponse),
202 (Test, Test),
203 (UpdateBuffer, Ack),
204 (UpdateWorktree, Ack),
205);
206
207entity_messages!(
208 project_id,
209 AddProjectCollaborator,
210 ApplyCodeAction,
211 ApplyCompletionAdditionalEdits,
212 BufferReloaded,
213 BufferSaved,
214 CreateProjectEntry,
215 RenameProjectEntry,
216 DeleteProjectEntry,
217 Follow,
218 FormatBuffers,
219 GetCodeActions,
220 GetCompletions,
221 GetDefinition,
222 GetDocumentHighlights,
223 GetReferences,
224 GetProjectSymbols,
225 JoinProject,
226 JoinProjectRequestCancelled,
227 LeaveProject,
228 OpenBufferById,
229 OpenBufferByPath,
230 OpenBufferForSymbol,
231 PerformRename,
232 PrepareRename,
233 ProjectUnshared,
234 ReloadBuffers,
235 RemoveProjectCollaborator,
236 RequestJoinProject,
237 SaveBuffer,
238 SearchProject,
239 StartLanguageServer,
240 Unfollow,
241 UnregisterProject,
242 UnregisterWorktree,
243 UpdateBuffer,
244 UpdateBufferFile,
245 UpdateDiagnosticSummary,
246 UpdateFollowers,
247 UpdateLanguageServer,
248 RegisterWorktree,
249 UpdateWorktree,
250);
251
252entity_messages!(channel_id, ChannelMessageSent);
253
254/// A stream of protobuf messages.
255pub struct MessageStream<S> {
256 stream: S,
257 encoding_buffer: Vec<u8>,
258}
259
260#[derive(Debug)]
261pub enum Message {
262 Envelope(Envelope),
263 Ping,
264 Pong,
265}
266
267impl<S> MessageStream<S> {
268 pub fn new(stream: S) -> Self {
269 Self {
270 stream,
271 encoding_buffer: Vec::new(),
272 }
273 }
274
275 pub fn inner_mut(&mut self) -> &mut S {
276 &mut self.stream
277 }
278}
279
280impl<S> MessageStream<S>
281where
282 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
283{
284 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
285 #[cfg(any(test, feature = "test-support"))]
286 const COMPRESSION_LEVEL: i32 = -7;
287
288 #[cfg(not(any(test, feature = "test-support")))]
289 const COMPRESSION_LEVEL: i32 = 4;
290
291 match message {
292 Message::Envelope(message) => {
293 self.encoding_buffer.resize(message.encoded_len(), 0);
294 self.encoding_buffer.clear();
295 message
296 .encode(&mut self.encoding_buffer)
297 .map_err(|err| io::Error::from(err))?;
298 let buffer =
299 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
300 .unwrap();
301 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
302 }
303 Message::Ping => {
304 self.stream
305 .send(WebSocketMessage::Ping(Default::default()))
306 .await?;
307 }
308 Message::Pong => {
309 self.stream
310 .send(WebSocketMessage::Pong(Default::default()))
311 .await?;
312 }
313 }
314
315 Ok(())
316 }
317}
318
319impl<S> MessageStream<S>
320where
321 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
322{
323 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
324 while let Some(bytes) = self.stream.next().await {
325 match bytes? {
326 WebSocketMessage::Binary(bytes) => {
327 self.encoding_buffer.clear();
328 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
329 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
330 .map_err(io::Error::from)?;
331 return Ok(Message::Envelope(envelope));
332 }
333 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
334 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
335 WebSocketMessage::Close(_) => break,
336 _ => {}
337 }
338 }
339 Err(anyhow!("connection closed"))
340 }
341}
342
343impl Into<SystemTime> for Timestamp {
344 fn into(self) -> SystemTime {
345 UNIX_EPOCH
346 .checked_add(Duration::new(self.seconds, self.nanos))
347 .unwrap()
348 }
349}
350
351impl From<SystemTime> for Timestamp {
352 fn from(time: SystemTime) -> Self {
353 let duration = time.duration_since(UNIX_EPOCH).unwrap();
354 Self {
355 seconds: duration.as_secs(),
356 nanos: duration.subsec_nanos(),
357 }
358 }
359}
360
361impl From<u128> for Nonce {
362 fn from(nonce: u128) -> Self {
363 let upper_half = (nonce >> 64) as u64;
364 let lower_half = nonce as u64;
365 Self {
366 upper_half,
367 lower_half,
368 }
369 }
370}
371
372impl From<Nonce> for u128 {
373 fn from(nonce: Nonce) -> Self {
374 let upper_half = (nonce.upper_half as u128) << 64;
375 let lower_half = nonce.lower_half as u128;
376 upper_half | lower_half
377 }
378}