proto.rs

  1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use async_tungstenite::tungstenite::Message as WebSocketMessage;
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message as _;
  6use serde::Serialize;
  7use std::any::{Any, TypeId};
  8use std::{
  9    fmt::Debug,
 10    io,
 11    time::{Duration, SystemTime, UNIX_EPOCH},
 12};
 13
 14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 15
 16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 17    const NAME: &'static str;
 18    const PRIORITY: MessagePriority;
 19    fn into_envelope(
 20        self,
 21        id: u32,
 22        responding_to: Option<u32>,
 23        original_sender_id: Option<u32>,
 24    ) -> Envelope;
 25    fn from_envelope(envelope: Envelope) -> Option<Self>;
 26}
 27
 28pub trait EntityMessage: EnvelopedMessage {
 29    fn remote_entity_id(&self) -> u64;
 30}
 31
 32pub trait RequestMessage: EnvelopedMessage {
 33    type Response: EnvelopedMessage;
 34}
 35
 36pub trait AnyTypedEnvelope: 'static + Send + Sync {
 37    fn payload_type_id(&self) -> TypeId;
 38    fn payload_type_name(&self) -> &'static str;
 39    fn as_any(&self) -> &dyn Any;
 40    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 41    fn is_background(&self) -> bool;
 42    fn original_sender_id(&self) -> Option<PeerId>;
 43}
 44
 45pub enum MessagePriority {
 46    Foreground,
 47    Background,
 48}
 49
 50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 51    fn payload_type_id(&self) -> TypeId {
 52        TypeId::of::<T>()
 53    }
 54
 55    fn payload_type_name(&self) -> &'static str {
 56        T::NAME
 57    }
 58
 59    fn as_any(&self) -> &dyn Any {
 60        self
 61    }
 62
 63    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 64        self
 65    }
 66
 67    fn is_background(&self) -> bool {
 68        matches!(T::PRIORITY, MessagePriority::Background)
 69    }
 70
 71    fn original_sender_id(&self) -> Option<PeerId> {
 72        self.original_sender_id
 73    }
 74}
 75
 76messages!(
 77    (Ack, Foreground),
 78    (AddProjectCollaborator, Foreground),
 79    (ApplyCodeAction, Background),
 80    (ApplyCodeActionResponse, Background),
 81    (ApplyCompletionAdditionalEdits, Background),
 82    (ApplyCompletionAdditionalEditsResponse, Background),
 83    (BufferReloaded, Foreground),
 84    (BufferSaved, Foreground),
 85    (RemoveContact, Foreground),
 86    (ChannelMessageSent, Foreground),
 87    (CreateProjectEntry, Foreground),
 88    (DeleteProjectEntry, Foreground),
 89    (Error, Foreground),
 90    (Follow, Foreground),
 91    (FollowResponse, Foreground),
 92    (FormatBuffers, Foreground),
 93    (FormatBuffersResponse, Foreground),
 94    (FuzzySearchUsers, Foreground),
 95    (GetChannelMessages, Foreground),
 96    (GetChannelMessagesResponse, Foreground),
 97    (GetChannels, Foreground),
 98    (GetChannelsResponse, Foreground),
 99    (GetCodeActions, Background),
100    (GetCodeActionsResponse, Background),
101    (GetCompletions, Background),
102    (GetCompletionsResponse, Background),
103    (GetDefinition, Background),
104    (GetDefinitionResponse, Background),
105    (GetDocumentHighlights, Background),
106    (GetDocumentHighlightsResponse, Background),
107    (GetReferences, Background),
108    (GetReferencesResponse, Background),
109    (GetProjectSymbols, Background),
110    (GetProjectSymbolsResponse, Background),
111    (GetUsers, Foreground),
112    (UsersResponse, Foreground),
113    (JoinChannel, Foreground),
114    (JoinChannelResponse, Foreground),
115    (JoinProject, Foreground),
116    (JoinProjectResponse, Foreground),
117    (LeaveChannel, Foreground),
118    (LeaveProject, Foreground),
119    (OpenBufferById, Background),
120    (OpenBufferByPath, Background),
121    (OpenBufferForSymbol, Background),
122    (OpenBufferForSymbolResponse, Background),
123    (OpenBufferResponse, Background),
124    (PerformRename, Background),
125    (PerformRenameResponse, Background),
126    (PrepareRename, Background),
127    (PrepareRenameResponse, Background),
128    (ProjectEntryResponse, Foreground),
129    (RegisterProjectResponse, Foreground),
130    (Ping, Foreground),
131    (RegisterProject, Foreground),
132    (RegisterWorktree, Foreground),
133    (ReloadBuffers, Foreground),
134    (ReloadBuffersResponse, Foreground),
135    (RemoveProjectCollaborator, Foreground),
136    (RenameProjectEntry, Foreground),
137    (RequestContact, Foreground),
138    (RespondToContactRequest, Foreground),
139    (SaveBuffer, Foreground),
140    (SearchProject, Background),
141    (SearchProjectResponse, Background),
142    (SendChannelMessage, Foreground),
143    (SendChannelMessageResponse, Foreground),
144    (ShareProject, Foreground),
145    (StartLanguageServer, Foreground),
146    (Test, Foreground),
147    (Unfollow, Foreground),
148    (UnregisterProject, Foreground),
149    (UnregisterWorktree, Foreground),
150    (UnshareProject, Foreground),
151    (UpdateBuffer, Foreground),
152    (UpdateBufferFile, Foreground),
153    (UpdateContacts, Foreground),
154    (UpdateDiagnosticSummary, Foreground),
155    (UpdateFollowers, Foreground),
156    (UpdateLanguageServer, Foreground),
157    (UpdateWorktree, Foreground),
158);
159
160request_messages!(
161    (ApplyCodeAction, ApplyCodeActionResponse),
162    (
163        ApplyCompletionAdditionalEdits,
164        ApplyCompletionAdditionalEditsResponse
165    ),
166    (CreateProjectEntry, ProjectEntryResponse),
167    (DeleteProjectEntry, ProjectEntryResponse),
168    (Follow, FollowResponse),
169    (FormatBuffers, FormatBuffersResponse),
170    (GetChannelMessages, GetChannelMessagesResponse),
171    (GetChannels, GetChannelsResponse),
172    (GetCodeActions, GetCodeActionsResponse),
173    (GetCompletions, GetCompletionsResponse),
174    (GetDefinition, GetDefinitionResponse),
175    (GetDocumentHighlights, GetDocumentHighlightsResponse),
176    (GetReferences, GetReferencesResponse),
177    (GetProjectSymbols, GetProjectSymbolsResponse),
178    (FuzzySearchUsers, UsersResponse),
179    (GetUsers, UsersResponse),
180    (JoinChannel, JoinChannelResponse),
181    (JoinProject, JoinProjectResponse),
182    (OpenBufferById, OpenBufferResponse),
183    (OpenBufferByPath, OpenBufferResponse),
184    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
185    (Ping, Ack),
186    (PerformRename, PerformRenameResponse),
187    (PrepareRename, PrepareRenameResponse),
188    (RegisterProject, RegisterProjectResponse),
189    (RegisterWorktree, Ack),
190    (ReloadBuffers, ReloadBuffersResponse),
191    (RequestContact, Ack),
192    (RemoveContact, Ack),
193    (RespondToContactRequest, Ack),
194    (RenameProjectEntry, ProjectEntryResponse),
195    (SaveBuffer, BufferSaved),
196    (SearchProject, SearchProjectResponse),
197    (SendChannelMessage, SendChannelMessageResponse),
198    (ShareProject, Ack),
199    (Test, Test),
200    (UpdateBuffer, Ack),
201    (UpdateWorktree, Ack),
202);
203
204entity_messages!(
205    project_id,
206    AddProjectCollaborator,
207    ApplyCodeAction,
208    ApplyCompletionAdditionalEdits,
209    BufferReloaded,
210    BufferSaved,
211    CreateProjectEntry,
212    RenameProjectEntry,
213    DeleteProjectEntry,
214    Follow,
215    FormatBuffers,
216    GetCodeActions,
217    GetCompletions,
218    GetDefinition,
219    GetDocumentHighlights,
220    GetReferences,
221    GetProjectSymbols,
222    JoinProject,
223    LeaveProject,
224    OpenBufferById,
225    OpenBufferByPath,
226    OpenBufferForSymbol,
227    PerformRename,
228    PrepareRename,
229    ReloadBuffers,
230    RemoveProjectCollaborator,
231    SaveBuffer,
232    SearchProject,
233    StartLanguageServer,
234    Unfollow,
235    UnregisterWorktree,
236    UnshareProject,
237    UpdateBuffer,
238    UpdateBufferFile,
239    UpdateDiagnosticSummary,
240    UpdateFollowers,
241    UpdateLanguageServer,
242    RegisterWorktree,
243    UpdateWorktree,
244);
245
246entity_messages!(channel_id, ChannelMessageSent);
247
248/// A stream of protobuf messages.
249pub struct MessageStream<S> {
250    stream: S,
251    encoding_buffer: Vec<u8>,
252}
253
254#[derive(Debug)]
255pub enum Message {
256    Envelope(Envelope),
257    Ping,
258    Pong,
259}
260
261impl<S> MessageStream<S> {
262    pub fn new(stream: S) -> Self {
263        Self {
264            stream,
265            encoding_buffer: Vec::new(),
266        }
267    }
268
269    pub fn inner_mut(&mut self) -> &mut S {
270        &mut self.stream
271    }
272}
273
274impl<S> MessageStream<S>
275where
276    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
277{
278    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
279        #[cfg(any(test, feature = "test-support"))]
280        const COMPRESSION_LEVEL: i32 = -7;
281
282        #[cfg(not(any(test, feature = "test-support")))]
283        const COMPRESSION_LEVEL: i32 = 4;
284
285        match message {
286            Message::Envelope(message) => {
287                self.encoding_buffer.resize(message.encoded_len(), 0);
288                self.encoding_buffer.clear();
289                message
290                    .encode(&mut self.encoding_buffer)
291                    .map_err(|err| io::Error::from(err))?;
292                let buffer =
293                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
294                        .unwrap();
295                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
296            }
297            Message::Ping => {
298                self.stream
299                    .send(WebSocketMessage::Ping(Default::default()))
300                    .await?;
301            }
302            Message::Pong => {
303                self.stream
304                    .send(WebSocketMessage::Pong(Default::default()))
305                    .await?;
306            }
307        }
308
309        Ok(())
310    }
311}
312
313impl<S> MessageStream<S>
314where
315    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
316{
317    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
318        while let Some(bytes) = self.stream.next().await {
319            match bytes? {
320                WebSocketMessage::Binary(bytes) => {
321                    self.encoding_buffer.clear();
322                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
323                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
324                        .map_err(io::Error::from)?;
325                    return Ok(Message::Envelope(envelope));
326                }
327                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
328                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
329                WebSocketMessage::Close(_) => break,
330                _ => {}
331            }
332        }
333        Err(anyhow!("connection closed"))
334    }
335}
336
337impl Into<SystemTime> for Timestamp {
338    fn into(self) -> SystemTime {
339        UNIX_EPOCH
340            .checked_add(Duration::new(self.seconds, self.nanos))
341            .unwrap()
342    }
343}
344
345impl From<SystemTime> for Timestamp {
346    fn from(time: SystemTime) -> Self {
347        let duration = time.duration_since(UNIX_EPOCH).unwrap();
348        Self {
349            seconds: duration.as_secs(),
350            nanos: duration.subsec_nanos(),
351        }
352    }
353}
354
355impl From<u128> for Nonce {
356    fn from(nonce: u128) -> Self {
357        let upper_half = (nonce >> 64) as u64;
358        let lower_half = nonce as u64;
359        Self {
360            upper_half,
361            lower_half,
362        }
363    }
364}
365
366impl From<Nonce> for u128 {
367    fn from(nonce: Nonce) -> Self {
368        let upper_half = (nonce.upper_half as u128) << 64;
369        let lower_half = nonce.lower_half as u128;
370        upper_half | lower_half
371    }
372}