1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
2use anyhow::{anyhow, Result};
3use async_tungstenite::tungstenite::Message as WebSocketMessage;
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message as _;
6use serde::Serialize;
7use std::any::{Any, TypeId};
8use std::{
9 fmt::Debug,
10 io,
11 time::{Duration, SystemTime, UNIX_EPOCH},
12};
13
14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
15
16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
17 const NAME: &'static str;
18 const PRIORITY: MessagePriority;
19 fn into_envelope(
20 self,
21 id: u32,
22 responding_to: Option<u32>,
23 original_sender_id: Option<u32>,
24 ) -> Envelope;
25 fn from_envelope(envelope: Envelope) -> Option<Self>;
26}
27
28pub trait EntityMessage: EnvelopedMessage {
29 fn remote_entity_id(&self) -> u64;
30}
31
32pub trait RequestMessage: EnvelopedMessage {
33 type Response: EnvelopedMessage;
34}
35
36pub trait AnyTypedEnvelope: 'static + Send + Sync {
37 fn payload_type_id(&self) -> TypeId;
38 fn payload_type_name(&self) -> &'static str;
39 fn as_any(&self) -> &dyn Any;
40 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
41 fn is_background(&self) -> bool;
42 fn original_sender_id(&self) -> Option<PeerId>;
43}
44
45pub enum MessagePriority {
46 Foreground,
47 Background,
48}
49
50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
51 fn payload_type_id(&self) -> TypeId {
52 TypeId::of::<T>()
53 }
54
55 fn payload_type_name(&self) -> &'static str {
56 T::NAME
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
64 self
65 }
66
67 fn is_background(&self) -> bool {
68 matches!(T::PRIORITY, MessagePriority::Background)
69 }
70
71 fn original_sender_id(&self) -> Option<PeerId> {
72 self.original_sender_id
73 }
74}
75
76messages!(
77 (Ack, Foreground),
78 (AddProjectCollaborator, Foreground),
79 (ApplyCodeAction, Background),
80 (ApplyCodeActionResponse, Background),
81 (ApplyCompletionAdditionalEdits, Background),
82 (ApplyCompletionAdditionalEditsResponse, Background),
83 (BufferReloaded, Foreground),
84 (BufferSaved, Foreground),
85 (ChannelMessageSent, Foreground),
86 (CreateProjectEntry, Foreground),
87 (DeleteProjectEntry, Foreground),
88 (Error, Foreground),
89 (Follow, Foreground),
90 (FollowResponse, Foreground),
91 (FormatBuffers, Foreground),
92 (FormatBuffersResponse, Foreground),
93 (FuzzySearchUsers, Foreground),
94 (GetChannelMessages, Foreground),
95 (GetChannelMessagesResponse, Foreground),
96 (GetChannels, Foreground),
97 (GetChannelsResponse, Foreground),
98 (GetCodeActions, Background),
99 (GetCodeActionsResponse, Background),
100 (GetCompletions, Background),
101 (GetCompletionsResponse, Background),
102 (GetDefinition, Background),
103 (GetDefinitionResponse, Background),
104 (GetDocumentHighlights, Background),
105 (GetDocumentHighlightsResponse, Background),
106 (GetReferences, Background),
107 (GetReferencesResponse, Background),
108 (GetProjectSymbols, Background),
109 (GetProjectSymbolsResponse, Background),
110 (GetUsers, Foreground),
111 (UsersResponse, Foreground),
112 (JoinChannel, Foreground),
113 (JoinChannelResponse, Foreground),
114 (JoinProject, Foreground),
115 (JoinProjectResponse, Foreground),
116 (LeaveChannel, Foreground),
117 (LeaveProject, Foreground),
118 (OpenBufferById, Background),
119 (OpenBufferByPath, Background),
120 (OpenBufferForSymbol, Background),
121 (OpenBufferForSymbolResponse, Background),
122 (OpenBufferResponse, Background),
123 (PerformRename, Background),
124 (PerformRenameResponse, Background),
125 (PrepareRename, Background),
126 (PrepareRenameResponse, Background),
127 (ProjectEntryResponse, Foreground),
128 (RegisterProjectResponse, Foreground),
129 (Ping, Foreground),
130 (RegisterProject, Foreground),
131 (RegisterWorktree, Foreground),
132 (ReloadBuffers, Foreground),
133 (ReloadBuffersResponse, Foreground),
134 (RemoveProjectCollaborator, Foreground),
135 (RenameProjectEntry, Foreground),
136 (RequestContact, Foreground),
137 (RespondToContactRequest, Foreground),
138 (SaveBuffer, Foreground),
139 (SearchProject, Background),
140 (SearchProjectResponse, Background),
141 (SendChannelMessage, Foreground),
142 (SendChannelMessageResponse, Foreground),
143 (ShareProject, Foreground),
144 (StartLanguageServer, Foreground),
145 (Test, Foreground),
146 (Unfollow, Foreground),
147 (UnregisterProject, Foreground),
148 (UnregisterWorktree, Foreground),
149 (UnshareProject, Foreground),
150 (UpdateBuffer, Foreground),
151 (UpdateBufferFile, Foreground),
152 (UpdateContacts, Foreground),
153 (UpdateDiagnosticSummary, Foreground),
154 (UpdateFollowers, Foreground),
155 (UpdateLanguageServer, Foreground),
156 (UpdateWorktree, Foreground),
157);
158
159request_messages!(
160 (ApplyCodeAction, ApplyCodeActionResponse),
161 (
162 ApplyCompletionAdditionalEdits,
163 ApplyCompletionAdditionalEditsResponse
164 ),
165 (CreateProjectEntry, ProjectEntryResponse),
166 (DeleteProjectEntry, ProjectEntryResponse),
167 (Follow, FollowResponse),
168 (FormatBuffers, FormatBuffersResponse),
169 (GetChannelMessages, GetChannelMessagesResponse),
170 (GetChannels, GetChannelsResponse),
171 (GetCodeActions, GetCodeActionsResponse),
172 (GetCompletions, GetCompletionsResponse),
173 (GetDefinition, GetDefinitionResponse),
174 (GetDocumentHighlights, GetDocumentHighlightsResponse),
175 (GetReferences, GetReferencesResponse),
176 (GetProjectSymbols, GetProjectSymbolsResponse),
177 (FuzzySearchUsers, UsersResponse),
178 (GetUsers, UsersResponse),
179 (JoinChannel, JoinChannelResponse),
180 (JoinProject, JoinProjectResponse),
181 (OpenBufferById, OpenBufferResponse),
182 (OpenBufferByPath, OpenBufferResponse),
183 (OpenBufferForSymbol, OpenBufferForSymbolResponse),
184 (Ping, Ack),
185 (PerformRename, PerformRenameResponse),
186 (PrepareRename, PrepareRenameResponse),
187 (RegisterProject, RegisterProjectResponse),
188 (RegisterWorktree, Ack),
189 (ReloadBuffers, ReloadBuffersResponse),
190 (RequestContact, Ack),
191 (RespondToContactRequest, Ack),
192 (RenameProjectEntry, ProjectEntryResponse),
193 (SaveBuffer, BufferSaved),
194 (SearchProject, SearchProjectResponse),
195 (SendChannelMessage, SendChannelMessageResponse),
196 (ShareProject, Ack),
197 (Test, Test),
198 (UpdateBuffer, Ack),
199 (UpdateWorktree, Ack),
200);
201
202entity_messages!(
203 project_id,
204 AddProjectCollaborator,
205 ApplyCodeAction,
206 ApplyCompletionAdditionalEdits,
207 BufferReloaded,
208 BufferSaved,
209 CreateProjectEntry,
210 RenameProjectEntry,
211 DeleteProjectEntry,
212 Follow,
213 FormatBuffers,
214 GetCodeActions,
215 GetCompletions,
216 GetDefinition,
217 GetDocumentHighlights,
218 GetReferences,
219 GetProjectSymbols,
220 JoinProject,
221 LeaveProject,
222 OpenBufferById,
223 OpenBufferByPath,
224 OpenBufferForSymbol,
225 PerformRename,
226 PrepareRename,
227 ReloadBuffers,
228 RemoveProjectCollaborator,
229 SaveBuffer,
230 SearchProject,
231 StartLanguageServer,
232 Unfollow,
233 UnregisterWorktree,
234 UnshareProject,
235 UpdateBuffer,
236 UpdateBufferFile,
237 UpdateDiagnosticSummary,
238 UpdateFollowers,
239 UpdateLanguageServer,
240 RegisterWorktree,
241 UpdateWorktree,
242);
243
244entity_messages!(channel_id, ChannelMessageSent);
245
246/// A stream of protobuf messages.
247pub struct MessageStream<S> {
248 stream: S,
249 encoding_buffer: Vec<u8>,
250}
251
252#[derive(Debug)]
253pub enum Message {
254 Envelope(Envelope),
255 Ping,
256 Pong,
257}
258
259impl<S> MessageStream<S> {
260 pub fn new(stream: S) -> Self {
261 Self {
262 stream,
263 encoding_buffer: Vec::new(),
264 }
265 }
266
267 pub fn inner_mut(&mut self) -> &mut S {
268 &mut self.stream
269 }
270}
271
272impl<S> MessageStream<S>
273where
274 S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
275{
276 pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
277 #[cfg(any(test, feature = "test-support"))]
278 const COMPRESSION_LEVEL: i32 = -7;
279
280 #[cfg(not(any(test, feature = "test-support")))]
281 const COMPRESSION_LEVEL: i32 = 4;
282
283 match message {
284 Message::Envelope(message) => {
285 self.encoding_buffer.resize(message.encoded_len(), 0);
286 self.encoding_buffer.clear();
287 message
288 .encode(&mut self.encoding_buffer)
289 .map_err(|err| io::Error::from(err))?;
290 let buffer =
291 zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
292 .unwrap();
293 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
294 }
295 Message::Ping => {
296 self.stream
297 .send(WebSocketMessage::Ping(Default::default()))
298 .await?;
299 }
300 Message::Pong => {
301 self.stream
302 .send(WebSocketMessage::Pong(Default::default()))
303 .await?;
304 }
305 }
306
307 Ok(())
308 }
309}
310
311impl<S> MessageStream<S>
312where
313 S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
314{
315 pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
316 while let Some(bytes) = self.stream.next().await {
317 match bytes? {
318 WebSocketMessage::Binary(bytes) => {
319 self.encoding_buffer.clear();
320 zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
321 let envelope = Envelope::decode(self.encoding_buffer.as_slice())
322 .map_err(io::Error::from)?;
323 return Ok(Message::Envelope(envelope));
324 }
325 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
326 WebSocketMessage::Pong(_) => return Ok(Message::Pong),
327 WebSocketMessage::Close(_) => break,
328 _ => {}
329 }
330 }
331 Err(anyhow!("connection closed"))
332 }
333}
334
335impl Into<SystemTime> for Timestamp {
336 fn into(self) -> SystemTime {
337 UNIX_EPOCH
338 .checked_add(Duration::new(self.seconds, self.nanos))
339 .unwrap()
340 }
341}
342
343impl From<SystemTime> for Timestamp {
344 fn from(time: SystemTime) -> Self {
345 let duration = time.duration_since(UNIX_EPOCH).unwrap();
346 Self {
347 seconds: duration.as_secs(),
348 nanos: duration.subsec_nanos(),
349 }
350 }
351}
352
353impl From<u128> for Nonce {
354 fn from(nonce: u128) -> Self {
355 let upper_half = (nonce >> 64) as u64;
356 let lower_half = nonce as u64;
357 Self {
358 upper_half,
359 lower_half,
360 }
361 }
362}
363
364impl From<Nonce> for u128 {
365 fn from(nonce: Nonce) -> Self {
366 let upper_half = (nonce.upper_half as u128) << 64;
367 let lower_half = nonce.lower_half as u128;
368 upper_half | lower_half
369 }
370}