1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use async_tungstenite::tungstenite::Message as WebSocketMessage;
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message as _;
6use serde::Serialize;
7use std::any::{Any, TypeId};
8use std::{
9 fmt::Debug,
10 io,
11 time::{Duration, SystemTime, UNIX_EPOCH},
12};
13
14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
15
16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
17 const NAME: &'static str;
18 const PRIORITY: MessagePriority;
19 fn into_envelope(
20 self,
21 id: u32,
22 responding_to: Option<u32>,
23 original_sender_id: Option<u32>,
24 ) -> Envelope;
25 fn from_envelope(envelope: Envelope) -> Option<Self>;
26}
27
28pub trait EntityMessage: EnvelopedMessage {
29 fn remote_entity_id(&self) -> u64;
30}
31
32pub trait RequestMessage: EnvelopedMessage {
33 type Response: EnvelopedMessage;
34}
35
36pub trait AnyTypedEnvelope: 'static + Send + Sync {
37 fn payload_type_id(&self) -> TypeId;
38 fn payload_type_name(&self) -> &'static str;
39 fn as_any(&self) -> &dyn Any;
40 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
41 fn is_background(&self) -> bool;
42 fn original_sender_id(&self) -> Option<PeerId>;
43}
44
45pub enum MessagePriority {
46 Foreground,
47 Background,
48}
49
50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
51 fn payload_type_id(&self) -> TypeId {
52 TypeId::of::<T>()
53 }
54
55 fn payload_type_name(&self) -> &'static str {
56 T::NAME
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
64 self
65 }
66
67 fn is_background(&self) -> bool {
68 matches!(T::PRIORITY, MessagePriority::Background)
69 }
70
71 fn original_sender_id(&self) -> Option<PeerId> {
72 self.original_sender_id
73 }
74}
75
76messages!(
77 (Ack, Foreground),
78 (AddProjectCollaborator, Foreground),
79 (ApplyCodeAction, Background),
80 (ApplyCodeActionResponse, Background),
81 (ApplyCompletionAdditionalEdits, Background),
82 (ApplyCompletionAdditionalEditsResponse, Background),
83 (BufferReloaded, Foreground),
84 (BufferSaved, Foreground),
85 (RemoveContact, Foreground),
86 (ChannelMessageSent, Foreground),
87 (CreateProjectEntry, Foreground),
88 (DeleteProjectEntry, Foreground),
89 (Error, Foreground),
90 (Follow, Foreground),
91 (FollowResponse, Foreground),
92 (FormatBuffers, Foreground),
93 (FormatBuffersResponse, Foreground),
94 (FuzzySearchUsers, Foreground),
95 (GetChannelMessages, Foreground),
96 (GetChannelMessagesResponse, Foreground),
97 (GetChannels, Foreground),
98 (GetChannelsResponse, Foreground),
99 (GetCodeActions, Background),
100 (GetCodeActionsResponse, Background),
101 (GetCompletions, Background),
102 (GetCompletionsResponse, Background),
103 (GetDefinition, Background),
104 (GetDefinitionResponse, Background),
105 (GetDocumentHighlights, Background),
106 (GetDocumentHighlightsResponse, Background),
107 (GetReferences, Background),
108 (GetReferencesResponse, Background),
109 (GetProjectSymbols, Background),
110 (GetProjectSymbolsResponse, Background),
111 (GetUsers, Foreground),
112 (UsersResponse, Foreground),
113 (JoinChannel, Foreground),
114 (JoinChannelResponse, Foreground),
115 (JoinProject, Foreground),
116 (JoinProjectResponse, Foreground),
117 (LeaveChannel, Foreground),
118 (LeaveProject, Foreground),
119 (OpenBufferById, Background),
120 (OpenBufferByPath, Background),
121 (OpenBufferForSymbol, Background),
122 (OpenBufferForSymbolResponse, Background),
123 (OpenBufferResponse, Background),
124 (PerformRename, Background),
125 (PerformRenameResponse, Background),
126 (PrepareRename, Background),
127 (PrepareRenameResponse, Background),
128 (ProjectEntryResponse, Foreground),
129 (RegisterProjectResponse, Foreground),
130 (Ping, Foreground),
131 (RegisterProject, Foreground),
132 (RegisterWorktree, Foreground),
133 (ReloadBuffers, Foreground),
134 (ReloadBuffersResponse, Foreground),
135 (RemoveProjectCollaborator, Foreground),
136 (RenameProjectEntry, Foreground),
137 (RequestContact, Foreground),
138 (RespondToContactRequest, Foreground),
139 (SaveBuffer, Foreground),
140 (SearchProject, Background),
141 (SearchProjectResponse, Background),
142 (SendChannelMessage, Foreground),
143 (SendChannelMessageResponse, Foreground),
144 (ShareProject, Foreground),
145 (StartLanguageServer, Foreground),
146 (Test, Foreground),
147 (Unfollow, Foreground),
148 (UnregisterProject, Foreground),
149 (UnregisterWorktree, Foreground),
150 (UnshareProject, Foreground),
151 (UpdateBuffer, Foreground),
152 (UpdateBufferFile, Foreground),
153 (UpdateContacts, Foreground),
154 (UpdateDiagnosticSummary, Foreground),
155 (UpdateFollowers, Foreground),
156 (UpdateLanguageServer, Foreground),
157 (UpdateWorktree, Foreground),
158);
159
160request_messages!(
161 (ApplyCodeAction, ApplyCodeActionResponse),
162 (
163 ApplyCompletionAdditionalEdits,
164 ApplyCompletionAdditionalEditsResponse
165 ),
166 (CreateProjectEntry, ProjectEntryResponse),
167 (DeleteProjectEntry, ProjectEntryResponse),
168 (Follow, FollowResponse),
169 (FormatBuffers, FormatBuffersResponse),
170 (GetChannelMessages, GetChannelMessagesResponse),
171 (GetChannels, GetChannelsResponse),
172 (GetCodeActions, GetCodeActionsResponse),
173 (GetCompletions, GetCompletionsResponse),
174 (GetDefinition, GetDefinitionResponse),
175 (GetDocumentHighlights, GetDocumentHighlightsResponse),
176 (GetReferences, GetReferencesResponse),
177 (GetProjectSymbols, GetProjectSymbolsResponse),
178 (FuzzySearchUsers, UsersResponse),
179 (GetUsers, UsersResponse),
180 (JoinChannel, JoinChannelResponse),
181 (JoinProject, JoinProjectResponse),
182 (OpenBufferById, OpenBufferResponse),
183 (OpenBufferByPath, OpenBufferResponse),
184 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
185 (Ping, Ack),
186 (PerformRename, PerformRenameResponse),
187 (PrepareRename, PrepareRenameResponse),
188 (RegisterProject, RegisterProjectResponse),
189 (RegisterWorktree, Ack),
190 (ReloadBuffers, ReloadBuffersResponse),
191 (RequestContact, Ack),
192 (RemoveContact, Ack),
193 (RespondToContactRequest, Ack),
194 (RenameProjectEntry, ProjectEntryResponse),
195 (SaveBuffer, BufferSaved),
196 (SearchProject, SearchProjectResponse),
197 (SendChannelMessage, SendChannelMessageResponse),
198 (ShareProject, Ack),
199 (Test, Test),
200 (UpdateBuffer, Ack),
201 (UpdateWorktree, Ack),
202);
203
204entity_messages!(
205 project_id,
206 AddProjectCollaborator,
207 ApplyCodeAction,
208 ApplyCompletionAdditionalEdits,
209 BufferReloaded,
210 BufferSaved,
211 CreateProjectEntry,
212 RenameProjectEntry,
213 DeleteProjectEntry,
214 Follow,
215 FormatBuffers,
216 GetCodeActions,
217 GetCompletions,
218 GetDefinition,
219 GetDocumentHighlights,
220 GetReferences,
221 GetProjectSymbols,
222 JoinProject,
223 LeaveProject,
224 OpenBufferById,
225 OpenBufferByPath,
226 OpenBufferForSymbol,
227 PerformRename,
228 PrepareRename,
229 ReloadBuffers,
230 RemoveProjectCollaborator,
231 SaveBuffer,
232 SearchProject,
233 StartLanguageServer,
234 Unfollow,
235 UnregisterWorktree,
236 UnshareProject,
237 UpdateBuffer,
238 UpdateBufferFile,
239 UpdateDiagnosticSummary,
240 UpdateFollowers,
241 UpdateLanguageServer,
242 RegisterWorktree,
243 UpdateWorktree,
244);
245
246entity_messages!(channel_id, ChannelMessageSent);
247
248/// A stream of protobuf messages.
249pub struct MessageStream<S> {
250 stream: S,
251 encoding_buffer: Vec<u8>,
252}
253
254#[derive(Debug)]
255pub enum Message {
256 Envelope(Envelope),
257 Ping,
258 Pong,
259}
260
261impl<S> MessageStream<S> {
262 pub fn new(stream: S) -> Self {
263 Self {
264 stream,
265 encoding_buffer: Vec::new(),
266 }
267 }
268
269 pub fn inner_mut(&mut self) -> &mut S {
270 &mut self.stream
271 }
272}
273
274impl<S> MessageStream<S>
275where
276 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
277{
278 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
279 #[cfg(any(test, feature = "test-support"))]
280 const COMPRESSION_LEVEL: i32 = -7;
281
282 #[cfg(not(any(test, feature = "test-support")))]
283 const COMPRESSION_LEVEL: i32 = 4;
284
285 match message {
286 Message::Envelope(message) => {
287 self.encoding_buffer.resize(message.encoded_len(), 0);
288 self.encoding_buffer.clear();
289 message
290 .encode(&mut self.encoding_buffer)
291 .map_err(|err| io::Error::from(err))?;
292 let buffer =
293 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
294 .unwrap();
295 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
296 }
297 Message::Ping => {
298 self.stream
299 .send(WebSocketMessage::Ping(Default::default()))
300 .await?;
301 }
302 Message::Pong => {
303 self.stream
304 .send(WebSocketMessage::Pong(Default::default()))
305 .await?;
306 }
307 }
308
309 Ok(())
310 }
311}
312
313impl<S> MessageStream<S>
314where
315 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
316{
317 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
318 while let Some(bytes) = self.stream.next().await {
319 match bytes? {
320 WebSocketMessage::Binary(bytes) => {
321 self.encoding_buffer.clear();
322 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
323 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
324 .map_err(io::Error::from)?;
325 return Ok(Message::Envelope(envelope));
326 }
327 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
328 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
329 WebSocketMessage::Close(_) => break,
330 _ => {}
331 }
332 }
333 Err(anyhow!("connection closed"))
334 }
335}
336
337impl Into<SystemTime> for Timestamp {
338 fn into(self) -> SystemTime {
339 UNIX_EPOCH
340 .checked_add(Duration::new(self.seconds, self.nanos))
341 .unwrap()
342 }
343}
344
345impl From<SystemTime> for Timestamp {
346 fn from(time: SystemTime) -> Self {
347 let duration = time.duration_since(UNIX_EPOCH).unwrap();
348 Self {
349 seconds: duration.as_secs(),
350 nanos: duration.subsec_nanos(),
351 }
352 }
353}
354
355impl From<u128> for Nonce {
356 fn from(nonce: u128) -> Self {
357 let upper_half = (nonce >> 64) as u64;
358 let lower_half = nonce as u64;
359 Self {
360 upper_half,
361 lower_half,
362 }
363 }
364}
365
366impl From<Nonce> for u128 {
367 fn from(nonce: Nonce) -> Self {
368 let upper_half = (nonce.upper_half as u128) << 64;
369 let lower_half = nonce.lower_half as u128;
370 upper_half | lower_half
371 }
372}