proto.rs

  1use super::{ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::Result;
  3use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message;
  6use std::any::{Any, TypeId};
  7use std::{
  8    io,
  9    time::{Duration, SystemTime, UNIX_EPOCH},
 10};
 11
 12include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 13
 14pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
 15    const NAME: &'static str;
 16    const PRIORITY: MessagePriority;
 17    fn into_envelope(
 18        self,
 19        id: u32,
 20        responding_to: Option<u32>,
 21        original_sender_id: Option<u32>,
 22    ) -> Envelope;
 23    fn from_envelope(envelope: Envelope) -> Option<Self>;
 24}
 25
 26pub trait EntityMessage: EnvelopedMessage {
 27    fn remote_entity_id(&self) -> u64;
 28}
 29
 30pub trait RequestMessage: EnvelopedMessage {
 31    type Response: EnvelopedMessage;
 32}
 33
 34pub trait AnyTypedEnvelope: 'static + Send + Sync {
 35    fn payload_type_id(&self) -> TypeId;
 36    fn payload_type_name(&self) -> &'static str;
 37    fn as_any(&self) -> &dyn Any;
 38    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 39    fn is_background(&self) -> bool;
 40    fn original_sender_id(&self) -> Option<PeerId>;
 41}
 42
 43pub enum MessagePriority {
 44    Foreground,
 45    Background,
 46}
 47
 48impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 49    fn payload_type_id(&self) -> TypeId {
 50        TypeId::of::<T>()
 51    }
 52
 53    fn payload_type_name(&self) -> &'static str {
 54        T::NAME
 55    }
 56
 57    fn as_any(&self) -> &dyn Any {
 58        self
 59    }
 60
 61    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 62        self
 63    }
 64
 65    fn is_background(&self) -> bool {
 66        matches!(T::PRIORITY, MessagePriority::Background)
 67    }
 68
 69    fn original_sender_id(&self) -> Option<PeerId> {
 70        self.original_sender_id
 71    }
 72}
 73
 74macro_rules! messages {
 75    ($(($name:ident, $priority:ident)),* $(,)?) => {
 76        pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
 77            match envelope.payload {
 78                $(Some(envelope::Payload::$name(payload)) => {
 79                    Some(Box::new(TypedEnvelope {
 80                        sender_id,
 81                        original_sender_id: envelope.original_sender_id.map(PeerId),
 82                        message_id: envelope.id,
 83                        payload,
 84                    }))
 85                }, )*
 86                _ => None
 87            }
 88        }
 89
 90        $(
 91            impl EnvelopedMessage for $name {
 92                const NAME: &'static str = std::stringify!($name);
 93                const PRIORITY: MessagePriority = MessagePriority::$priority;
 94
 95                fn into_envelope(
 96                    self,
 97                    id: u32,
 98                    responding_to: Option<u32>,
 99                    original_sender_id: Option<u32>,
100                ) -> Envelope {
101                    Envelope {
102                        id,
103                        responding_to,
104                        original_sender_id,
105                        payload: Some(envelope::Payload::$name(self)),
106                    }
107                }
108
109                fn from_envelope(envelope: Envelope) -> Option<Self> {
110                    if let Some(envelope::Payload::$name(msg)) = envelope.payload {
111                        Some(msg)
112                    } else {
113                        None
114                    }
115                }
116            }
117        )*
118    };
119}
120
121macro_rules! request_messages {
122    ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
123        $(impl RequestMessage for $request_name {
124            type Response = $response_name;
125        })*
126    };
127}
128
129macro_rules! entity_messages {
130    ($id_field:ident, $($name:ident),* $(,)?) => {
131        $(impl EntityMessage for $name {
132            fn remote_entity_id(&self) -> u64 {
133                self.$id_field
134            }
135        })*
136    };
137}
138
139messages!(
140    (Ack, Foreground),
141    (AddProjectCollaborator, Foreground),
142    (ApplyCodeAction, Foreground),
143    (ApplyCodeActionResponse, Foreground),
144    (ApplyCompletionAdditionalEdits, Foreground),
145    (ApplyCompletionAdditionalEditsResponse, Foreground),
146    (BufferReloaded, Foreground),
147    (BufferSaved, Foreground),
148    (ChannelMessageSent, Foreground),
149    (CloseBuffer, Foreground),
150    (DiskBasedDiagnosticsUpdated, Background),
151    (DiskBasedDiagnosticsUpdating, Background),
152    (Error, Foreground),
153    (FormatBuffers, Foreground),
154    (FormatBuffersResponse, Foreground),
155    (GetChannelMessages, Foreground),
156    (GetChannelMessagesResponse, Foreground),
157    (GetChannels, Foreground),
158    (GetChannelsResponse, Foreground),
159    (GetCodeActions, Background),
160    (GetCodeActionsResponse, Foreground),
161    (GetCompletions, Background),
162    (GetCompletionsResponse, Foreground),
163    (GetDefinition, Foreground),
164    (GetDefinitionResponse, Foreground),
165    (GetDocumentHighlights, Background),
166    (GetDocumentHighlightsResponse, Background),
167    (GetReferences, Foreground),
168    (GetReferencesResponse, Foreground),
169    (GetProjectSymbols, Background),
170    (GetProjectSymbolsResponse, Background),
171    (GetUsers, Foreground),
172    (GetUsersResponse, Foreground),
173    (JoinChannel, Foreground),
174    (JoinChannelResponse, Foreground),
175    (JoinProject, Foreground),
176    (JoinProjectResponse, Foreground),
177    (LeaveChannel, Foreground),
178    (LeaveProject, Foreground),
179    (OpenBuffer, Foreground),
180    (OpenBufferForSymbol, Foreground),
181    (OpenBufferForSymbolResponse, Foreground),
182    (OpenBufferResponse, Foreground),
183    (PerformRename, Background),
184    (PerformRenameResponse, Background),
185    (PrepareRename, Background),
186    (PrepareRenameResponse, Background),
187    (RegisterProjectResponse, Foreground),
188    (Ping, Foreground),
189    (RegisterProject, Foreground),
190    (RegisterWorktree, Foreground),
191    (RemoveProjectCollaborator, Foreground),
192    (SaveBuffer, Foreground),
193    (SearchProject, Foreground),
194    (SearchProjectResponse, Foreground),
195    (SendChannelMessage, Foreground),
196    (SendChannelMessageResponse, Foreground),
197    (ShareProject, Foreground),
198    (Test, Foreground),
199    (UnregisterProject, Foreground),
200    (UnregisterWorktree, Foreground),
201    (UnshareProject, Foreground),
202    (UpdateBuffer, Foreground),
203    (UpdateBufferFile, Foreground),
204    (UpdateContacts, Foreground),
205    (UpdateDiagnosticSummary, Foreground),
206    (UpdateWorktree, Foreground),
207);
208
209request_messages!(
210    (ApplyCodeAction, ApplyCodeActionResponse),
211    (
212        ApplyCompletionAdditionalEdits,
213        ApplyCompletionAdditionalEditsResponse
214    ),
215    (FormatBuffers, FormatBuffersResponse),
216    (GetChannelMessages, GetChannelMessagesResponse),
217    (GetChannels, GetChannelsResponse),
218    (GetCodeActions, GetCodeActionsResponse),
219    (GetCompletions, GetCompletionsResponse),
220    (GetDefinition, GetDefinitionResponse),
221    (GetDocumentHighlights, GetDocumentHighlightsResponse),
222    (GetReferences, GetReferencesResponse),
223    (GetProjectSymbols, GetProjectSymbolsResponse),
224    (GetUsers, GetUsersResponse),
225    (JoinChannel, JoinChannelResponse),
226    (JoinProject, JoinProjectResponse),
227    (OpenBuffer, OpenBufferResponse),
228    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
229    (Ping, Ack),
230    (PerformRename, PerformRenameResponse),
231    (PrepareRename, PrepareRenameResponse),
232    (RegisterProject, RegisterProjectResponse),
233    (RegisterWorktree, Ack),
234    (SaveBuffer, BufferSaved),
235    (SearchProject, SearchProjectResponse),
236    (SendChannelMessage, SendChannelMessageResponse),
237    (ShareProject, Ack),
238    (Test, Test),
239    (UpdateBuffer, Ack),
240    (UpdateWorktree, Ack),
241);
242
243entity_messages!(
244    project_id,
245    AddProjectCollaborator,
246    ApplyCodeAction,
247    ApplyCompletionAdditionalEdits,
248    BufferReloaded,
249    BufferSaved,
250    CloseBuffer,
251    DiskBasedDiagnosticsUpdated,
252    DiskBasedDiagnosticsUpdating,
253    FormatBuffers,
254    GetCodeActions,
255    GetCompletions,
256    GetDefinition,
257    GetDocumentHighlights,
258    GetReferences,
259    GetProjectSymbols,
260    JoinProject,
261    LeaveProject,
262    OpenBuffer,
263    OpenBufferForSymbol,
264    PerformRename,
265    PrepareRename,
266    RemoveProjectCollaborator,
267    SaveBuffer,
268    SearchProject,
269    UnregisterWorktree,
270    UnshareProject,
271    UpdateBuffer,
272    UpdateBufferFile,
273    UpdateDiagnosticSummary,
274    RegisterWorktree,
275    UpdateWorktree,
276);
277
278entity_messages!(channel_id, ChannelMessageSent);
279
280/// A stream of protobuf messages.
281pub struct MessageStream<S> {
282    stream: S,
283    encoding_buffer: Vec<u8>,
284}
285
286impl<S> MessageStream<S> {
287    pub fn new(stream: S) -> Self {
288        Self {
289            stream,
290            encoding_buffer: Vec::new(),
291        }
292    }
293
294    pub fn inner_mut(&mut self) -> &mut S {
295        &mut self.stream
296    }
297}
298
299impl<S> MessageStream<S>
300where
301    S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
302{
303    /// Write a given protobuf message to the stream.
304    pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
305        #[cfg(any(test, feature = "test-support"))]
306        const COMPRESSION_LEVEL: i32 = -7;
307
308        #[cfg(not(any(test, feature = "test-support")))]
309        const COMPRESSION_LEVEL: i32 = 4;
310
311        self.encoding_buffer.resize(message.encoded_len(), 0);
312        self.encoding_buffer.clear();
313        message
314            .encode(&mut self.encoding_buffer)
315            .map_err(|err| io::Error::from(err))?;
316        let buffer =
317            zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL).unwrap();
318        self.stream.send(WebSocketMessage::Binary(buffer)).await?;
319        Ok(())
320    }
321}
322
323impl<S> MessageStream<S>
324where
325    S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
326{
327    /// Read a protobuf message of the given type from the stream.
328    pub async fn read_message(&mut self) -> Result<Envelope, WebSocketError> {
329        while let Some(bytes) = self.stream.next().await {
330            match bytes? {
331                WebSocketMessage::Binary(bytes) => {
332                    self.encoding_buffer.clear();
333                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
334                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
335                        .map_err(io::Error::from)?;
336                    return Ok(envelope);
337                }
338                WebSocketMessage::Close(_) => break,
339                _ => {}
340            }
341        }
342        Err(WebSocketError::ConnectionClosed)
343    }
344}
345
346impl Into<SystemTime> for Timestamp {
347    fn into(self) -> SystemTime {
348        UNIX_EPOCH
349            .checked_add(Duration::new(self.seconds, self.nanos))
350            .unwrap()
351    }
352}
353
354impl From<SystemTime> for Timestamp {
355    fn from(time: SystemTime) -> Self {
356        let duration = time.duration_since(UNIX_EPOCH).unwrap();
357        Self {
358            seconds: duration.as_secs(),
359            nanos: duration.subsec_nanos(),
360        }
361    }
362}
363
364impl From<u128> for Nonce {
365    fn from(nonce: u128) -> Self {
366        let upper_half = (nonce >> 64) as u64;
367        let lower_half = nonce as u64;
368        Self {
369            upper_half,
370            lower_half,
371        }
372    }
373}
374
375impl From<Nonce> for u128 {
376    fn from(nonce: Nonce) -> Self {
377        let upper_half = (nonce.upper_half as u128) << 64;
378        let lower_half = nonce.lower_half as u128;
379        upper_half | lower_half
380    }
381}