proto.rs

  1use super::{ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::Result;
  3use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message;
  6use std::any::{Any, TypeId};
  7use std::{
  8    io,
  9    time::{Duration, SystemTime, UNIX_EPOCH},
 10};
 11
 12include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 13
 14pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
 15    const NAME: &'static str;
 16    const PRIORITY: MessagePriority;
 17    fn into_envelope(
 18        self,
 19        id: u32,
 20        responding_to: Option<u32>,
 21        original_sender_id: Option<u32>,
 22    ) -> Envelope;
 23    fn from_envelope(envelope: Envelope) -> Option<Self>;
 24}
 25
 26pub trait EntityMessage: EnvelopedMessage {
 27    fn remote_entity_id(&self) -> u64;
 28}
 29
 30pub trait RequestMessage: EnvelopedMessage {
 31    type Response: EnvelopedMessage;
 32}
 33
 34pub trait AnyTypedEnvelope: 'static + Send + Sync {
 35    fn payload_type_id(&self) -> TypeId;
 36    fn payload_type_name(&self) -> &'static str;
 37    fn as_any(&self) -> &dyn Any;
 38    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 39    fn is_background(&self) -> bool;
 40    fn original_sender_id(&self) -> Option<PeerId>;
 41}
 42
 43pub enum MessagePriority {
 44    Foreground,
 45    Background,
 46}
 47
 48impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 49    fn payload_type_id(&self) -> TypeId {
 50        TypeId::of::<T>()
 51    }
 52
 53    fn payload_type_name(&self) -> &'static str {
 54        T::NAME
 55    }
 56
 57    fn as_any(&self) -> &dyn Any {
 58        self
 59    }
 60
 61    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 62        self
 63    }
 64
 65    fn is_background(&self) -> bool {
 66        matches!(T::PRIORITY, MessagePriority::Background)
 67    }
 68
 69    fn original_sender_id(&self) -> Option<PeerId> {
 70        self.original_sender_id
 71    }
 72}
 73
 74macro_rules! messages {
 75    ($(($name:ident, $priority:ident)),* $(,)?) => {
 76        pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
 77            match envelope.payload {
 78                $(Some(envelope::Payload::$name(payload)) => {
 79                    Some(Box::new(TypedEnvelope {
 80                        sender_id,
 81                        original_sender_id: envelope.original_sender_id.map(PeerId),
 82                        message_id: envelope.id,
 83                        payload,
 84                    }))
 85                }, )*
 86                _ => None
 87            }
 88        }
 89
 90        $(
 91            impl EnvelopedMessage for $name {
 92                const NAME: &'static str = std::stringify!($name);
 93                const PRIORITY: MessagePriority = MessagePriority::$priority;
 94
 95                fn into_envelope(
 96                    self,
 97                    id: u32,
 98                    responding_to: Option<u32>,
 99                    original_sender_id: Option<u32>,
100                ) -> Envelope {
101                    Envelope {
102                        id,
103                        responding_to,
104                        original_sender_id,
105                        payload: Some(envelope::Payload::$name(self)),
106                    }
107                }
108
109                fn from_envelope(envelope: Envelope) -> Option<Self> {
110                    if let Some(envelope::Payload::$name(msg)) = envelope.payload {
111                        Some(msg)
112                    } else {
113                        None
114                    }
115                }
116            }
117        )*
118    };
119}
120
121macro_rules! request_messages {
122    ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
123        $(impl RequestMessage for $request_name {
124            type Response = $response_name;
125        })*
126    };
127}
128
129macro_rules! entity_messages {
130    ($id_field:ident, $($name:ident),* $(,)?) => {
131        $(impl EntityMessage for $name {
132            fn remote_entity_id(&self) -> u64 {
133                self.$id_field
134            }
135        })*
136    };
137}
138
139messages!(
140    (Ack, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Foreground),
143    (ApplyCodeActionResponse, Foreground),
144    (ApplyCompletionAdditionalEdits, Foreground),
145    (ApplyCompletionAdditionalEditsResponse, Foreground),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (ChannelMessageSent, Foreground),
149    (CloseBuffer, Foreground),
150    (DiskBasedDiagnosticsUpdated, Background),
151    (DiskBasedDiagnosticsUpdating, Background),
152    (Error, Foreground),
153    (FormatBuffers, Foreground),
154    (FormatBuffersResponse, Foreground),
155    (GetChannelMessages, Foreground),
156    (GetChannelMessagesResponse, Foreground),
157    (GetChannels, Foreground),
158    (GetChannelsResponse, Foreground),
159    (GetCodeActions, Background),
160    (GetCodeActionsResponse, Foreground),
161    (GetCompletions, Background),
162    (GetCompletionsResponse, Foreground),
163    (GetDefinition, Foreground),
164    (GetDefinitionResponse, Foreground),
165    (GetDocumentHighlights, Background),
166    (GetDocumentHighlightsResponse, Background),
167    (GetReferences, Foreground),
168    (GetReferencesResponse, Foreground),
169    (GetProjectSymbols, Background),
170    (GetProjectSymbolsResponse, Background),
171    (GetUsers, Foreground),
172    (GetUsersResponse, Foreground),
173    (JoinChannel, Foreground),
174    (JoinChannelResponse, Foreground),
175    (JoinProject, Foreground),
176    (JoinProjectResponse, Foreground),
177    (LeaveChannel, Foreground),
178    (LeaveProject, Foreground),
179    (OpenBuffer, Foreground),
180    (OpenBufferForSymbol, Foreground),
181    (OpenBufferForSymbolResponse, Foreground),
182    (OpenBufferResponse, Foreground),
183    (PerformRename, Background),
184    (PerformRenameResponse, Background),
185    (PrepareRename, Background),
186    (PrepareRenameResponse, Background),
187    (RegisterProjectResponse, Foreground),
188    (Ping, Foreground),
189    (RegisterProject, Foreground),
190    (RegisterWorktree, Foreground),
191    (RemoveProjectCollaborator, Foreground),
192    (SaveBuffer, Foreground),
193    (SendChannelMessage, Foreground),
194    (SendChannelMessageResponse, Foreground),
195    (ShareProject, Foreground),
196    (Test, Foreground),
197    (UnregisterProject, Foreground),
198    (UnregisterWorktree, Foreground),
199    (UnshareProject, Foreground),
200    (UpdateBuffer, Foreground),
201    (UpdateBufferFile, Foreground),
202    (UpdateContacts, Foreground),
203    (UpdateDiagnosticSummary, Foreground),
204    (UpdateWorktree, Foreground),
205);
206
207request_messages!(
208    (ApplyCodeAction, ApplyCodeActionResponse),
209    (
210        ApplyCompletionAdditionalEdits,
211        ApplyCompletionAdditionalEditsResponse
212    ),
213    (FormatBuffers, FormatBuffersResponse),
214    (GetChannelMessages, GetChannelMessagesResponse),
215    (GetChannels, GetChannelsResponse),
216    (GetCodeActions, GetCodeActionsResponse),
217    (GetCompletions, GetCompletionsResponse),
218    (GetDefinition, GetDefinitionResponse),
219    (GetDocumentHighlights, GetDocumentHighlightsResponse),
220    (GetReferences, GetReferencesResponse),
221    (GetProjectSymbols, GetProjectSymbolsResponse),
222    (GetUsers, GetUsersResponse),
223    (JoinChannel, JoinChannelResponse),
224    (JoinProject, JoinProjectResponse),
225    (OpenBuffer, OpenBufferResponse),
226    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
227    (Ping, Ack),
228    (PerformRename, PerformRenameResponse),
229    (PrepareRename, PrepareRenameResponse),
230    (RegisterProject, RegisterProjectResponse),
231    (RegisterWorktree, Ack),
232    (SaveBuffer, BufferSaved),
233    (SendChannelMessage, SendChannelMessageResponse),
234    (ShareProject, Ack),
235    (Test, Test),
236    (UpdateBuffer, Ack),
237    (UpdateWorktree, Ack),
238);
239
240entity_messages!(
241    project_id,
242    AddProjectCollaborator,
243    ApplyCodeAction,
244    ApplyCompletionAdditionalEdits,
245    BufferReloaded,
246    BufferSaved,
247    CloseBuffer,
248    DiskBasedDiagnosticsUpdated,
249    DiskBasedDiagnosticsUpdating,
250    FormatBuffers,
251    GetCodeActions,
252    GetCompletions,
253    GetDefinition,
254    GetDocumentHighlights,
255    GetReferences,
256    GetProjectSymbols,
257    JoinProject,
258    LeaveProject,
259    OpenBuffer,
260    OpenBufferForSymbol,
261    PerformRename,
262    PrepareRename,
263    RemoveProjectCollaborator,
264    SaveBuffer,
265    UnregisterWorktree,
266    UnshareProject,
267    UpdateBuffer,
268    UpdateBufferFile,
269    UpdateDiagnosticSummary,
270    RegisterWorktree,
271    UpdateWorktree,
272);
273
274entity_messages!(channel_id, ChannelMessageSent);
275
276/// A stream of protobuf messages.
277pub struct MessageStream<S> {
278    stream: S,
279    encoding_buffer: Vec<u8>,
280}
281
282impl<S> MessageStream<S> {
283    pub fn new(stream: S) -> Self {
284        Self {
285            stream,
286            encoding_buffer: Vec::new(),
287        }
288    }
289
290    pub fn inner_mut(&mut self) -> &mut S {
291        &mut self.stream
292    }
293}
294
295impl<S> MessageStream<S>
296where
297    S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
298{
299    /// Write a given protobuf message to the stream.
300    pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
301        #[cfg(any(test, feature = "test-support"))]
302        const COMPRESSION_LEVEL: i32 = -7;
303
304        #[cfg(not(any(test, feature = "test-support")))]
305        const COMPRESSION_LEVEL: i32 = 4;
306
307        self.encoding_buffer.resize(message.encoded_len(), 0);
308        self.encoding_buffer.clear();
309        message
310            .encode(&mut self.encoding_buffer)
311            .map_err(|err| io::Error::from(err))?;
312        let buffer =
313            zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL).unwrap();
314        self.stream.send(WebSocketMessage::Binary(buffer)).await?;
315        Ok(())
316    }
317}
318
319impl<S> MessageStream<S>
320where
321    S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
322{
323    /// Read a protobuf message of the given type from the stream.
324    pub async fn read_message(&mut self) -> Result<Envelope, WebSocketError> {
325        while let Some(bytes) = self.stream.next().await {
326            match bytes? {
327                WebSocketMessage::Binary(bytes) => {
328                    self.encoding_buffer.clear();
329                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
330                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
331                        .map_err(io::Error::from)?;
332                    return Ok(envelope);
333                }
334                WebSocketMessage::Close(_) => break,
335                _ => {}
336            }
337        }
338        Err(WebSocketError::ConnectionClosed)
339    }
340}
341
342impl Into<SystemTime> for Timestamp {
343    fn into(self) -> SystemTime {
344        UNIX_EPOCH
345            .checked_add(Duration::new(self.seconds, self.nanos))
346            .unwrap()
347    }
348}
349
350impl From<SystemTime> for Timestamp {
351    fn from(time: SystemTime) -> Self {
352        let duration = time.duration_since(UNIX_EPOCH).unwrap();
353        Self {
354            seconds: duration.as_secs(),
355            nanos: duration.subsec_nanos(),
356        }
357    }
358}
359
360impl From<u128> for Nonce {
361    fn from(nonce: u128) -> Self {
362        let upper_half = (nonce >> 64) as u64;
363        let lower_half = nonce as u64;
364        Self {
365            upper_half,
366            lower_half,
367        }
368    }
369}
370
371impl From<Nonce> for u128 {
372    fn from(nonce: Nonce) -> Self {
373        let upper_half = (nonce.upper_half as u128) << 64;
374        let lower_half = nonce.lower_half as u128;
375        upper_half | lower_half
376    }
377}