proto.rs

  1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use async_tungstenite::tungstenite::Message as WebSocketMessage;
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message as _;
  6use serde::Serialize;
  7use std::any::{Any, TypeId};
  8use std::{
  9    fmt::Debug,
 10    io,
 11    time::{Duration, SystemTime, UNIX_EPOCH},
 12};
 13
 14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 15
 16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 17    const NAME: &'static str;
 18    const PRIORITY: MessagePriority;
 19    fn into_envelope(
 20        self,
 21        id: u32,
 22        responding_to: Option<u32>,
 23        original_sender_id: Option<u32>,
 24    ) -> Envelope;
 25    fn from_envelope(envelope: Envelope) -> Option<Self>;
 26}
 27
 28pub trait EntityMessage: EnvelopedMessage {
 29    fn remote_entity_id(&self) -> u64;
 30}
 31
 32pub trait RequestMessage: EnvelopedMessage {
 33    type Response: EnvelopedMessage;
 34}
 35
 36pub trait AnyTypedEnvelope: 'static + Send + Sync {
 37    fn payload_type_id(&self) -> TypeId;
 38    fn payload_type_name(&self) -> &'static str;
 39    fn as_any(&self) -> &dyn Any;
 40    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 41    fn is_background(&self) -> bool;
 42    fn original_sender_id(&self) -> Option<PeerId>;
 43}
 44
 45pub enum MessagePriority {
 46    Foreground,
 47    Background,
 48}
 49
 50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 51    fn payload_type_id(&self) -> TypeId {
 52        TypeId::of::<T>()
 53    }
 54
 55    fn payload_type_name(&self) -> &'static str {
 56        T::NAME
 57    }
 58
 59    fn as_any(&self) -> &dyn Any {
 60        self
 61    }
 62
 63    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 64        self
 65    }
 66
 67    fn is_background(&self) -> bool {
 68        matches!(T::PRIORITY, MessagePriority::Background)
 69    }
 70
 71    fn original_sender_id(&self) -> Option<PeerId> {
 72        self.original_sender_id
 73    }
 74}
 75
 76messages!(
 77    (Ack, Foreground),
 78    (AddProjectCollaborator, Foreground),
 79    (ApplyCodeAction, Background),
 80    (ApplyCodeActionResponse, Background),
 81    (ApplyCompletionAdditionalEdits, Background),
 82    (ApplyCompletionAdditionalEditsResponse, Background),
 83    (BufferReloaded, Foreground),
 84    (BufferSaved, Foreground),
 85    (RemoveContact, Foreground),
 86    (ChannelMessageSent, Foreground),
 87    (CreateProjectEntry, Foreground),
 88    (DeleteProjectEntry, Foreground),
 89    (Error, Foreground),
 90    (Follow, Foreground),
 91    (FollowResponse, Foreground),
 92    (FormatBuffers, Foreground),
 93    (FormatBuffersResponse, Foreground),
 94    (FuzzySearchUsers, Foreground),
 95    (GetChannelMessages, Foreground),
 96    (GetChannelMessagesResponse, Foreground),
 97    (GetChannels, Foreground),
 98    (GetChannelsResponse, Foreground),
 99    (GetCodeActions, Background),
100    (GetCodeActionsResponse, Background),
101    (GetCompletions, Background),
102    (GetCompletionsResponse, Background),
103    (GetDefinition, Background),
104    (GetDefinitionResponse, Background),
105    (GetDocumentHighlights, Background),
106    (GetDocumentHighlightsResponse, Background),
107    (GetReferences, Background),
108    (GetReferencesResponse, Background),
109    (GetProjectSymbols, Background),
110    (GetProjectSymbolsResponse, Background),
111    (GetUsers, Foreground),
112    (UsersResponse, Foreground),
113    (JoinChannel, Foreground),
114    (JoinChannelResponse, Foreground),
115    (JoinProject, Foreground),
116    (JoinProjectResponse, Foreground),
117    (JoinProjectRequestCancelled, Foreground),
118    (LeaveChannel, Foreground),
119    (LeaveProject, Foreground),
120    (OpenBufferById, Background),
121    (OpenBufferByPath, Background),
122    (OpenBufferForSymbol, Background),
123    (OpenBufferForSymbolResponse, Background),
124    (OpenBufferResponse, Background),
125    (PerformRename, Background),
126    (PerformRenameResponse, Background),
127    (PrepareRename, Background),
128    (PrepareRenameResponse, Background),
129    (ProjectEntryResponse, Foreground),
130    (RegisterProjectResponse, Foreground),
131    (Ping, Foreground),
132    (ProjectUnshared, Foreground),
133    (RegisterProject, Foreground),
134    (RegisterWorktree, Foreground),
135    (ReloadBuffers, Foreground),
136    (ReloadBuffersResponse, Foreground),
137    (RemoveProjectCollaborator, Foreground),
138    (RenameProjectEntry, Foreground),
139    (RequestContact, Foreground),
140    (RequestJoinProject, Foreground),
141    (RespondToContactRequest, Foreground),
142    (RespondToJoinProjectRequest, Foreground),
143    (SaveBuffer, Foreground),
144    (SearchProject, Background),
145    (SearchProjectResponse, Background),
146    (SendChannelMessage, Foreground),
147    (SendChannelMessageResponse, Foreground),
148    (StartLanguageServer, Foreground),
149    (Test, Foreground),
150    (Unfollow, Foreground),
151    (UnregisterProject, Foreground),
152    (UnregisterWorktree, Foreground),
153    (UpdateBuffer, Foreground),
154    (UpdateBufferFile, Foreground),
155    (UpdateContacts, Foreground),
156    (UpdateDiagnosticSummary, Foreground),
157    (UpdateFollowers, Foreground),
158    (UpdateLanguageServer, Foreground),
159    (UpdateWorktree, Foreground),
160);
161
162request_messages!(
163    (ApplyCodeAction, ApplyCodeActionResponse),
164    (
165        ApplyCompletionAdditionalEdits,
166        ApplyCompletionAdditionalEditsResponse
167    ),
168    (CreateProjectEntry, ProjectEntryResponse),
169    (DeleteProjectEntry, ProjectEntryResponse),
170    (Follow, FollowResponse),
171    (FormatBuffers, FormatBuffersResponse),
172    (GetChannelMessages, GetChannelMessagesResponse),
173    (GetChannels, GetChannelsResponse),
174    (GetCodeActions, GetCodeActionsResponse),
175    (GetCompletions, GetCompletionsResponse),
176    (GetDefinition, GetDefinitionResponse),
177    (GetDocumentHighlights, GetDocumentHighlightsResponse),
178    (GetReferences, GetReferencesResponse),
179    (GetProjectSymbols, GetProjectSymbolsResponse),
180    (FuzzySearchUsers, UsersResponse),
181    (GetUsers, UsersResponse),
182    (JoinChannel, JoinChannelResponse),
183    (JoinProject, JoinProjectResponse),
184    (OpenBufferById, OpenBufferResponse),
185    (OpenBufferByPath, OpenBufferResponse),
186    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
187    (Ping, Ack),
188    (PerformRename, PerformRenameResponse),
189    (PrepareRename, PrepareRenameResponse),
190    (RegisterProject, RegisterProjectResponse),
191    (RegisterWorktree, Ack),
192    (ReloadBuffers, ReloadBuffersResponse),
193    (RequestContact, Ack),
194    (RemoveContact, Ack),
195    (RespondToContactRequest, Ack),
196    (RenameProjectEntry, ProjectEntryResponse),
197    (SaveBuffer, BufferSaved),
198    (SearchProject, SearchProjectResponse),
199    (SendChannelMessage, SendChannelMessageResponse),
200    (Test, Test),
201    (UpdateBuffer, Ack),
202    (UpdateWorktree, Ack),
203);
204
205entity_messages!(
206    project_id,
207    AddProjectCollaborator,
208    ApplyCodeAction,
209    ApplyCompletionAdditionalEdits,
210    BufferReloaded,
211    BufferSaved,
212    CreateProjectEntry,
213    RenameProjectEntry,
214    DeleteProjectEntry,
215    Follow,
216    FormatBuffers,
217    GetCodeActions,
218    GetCompletions,
219    GetDefinition,
220    GetDocumentHighlights,
221    GetReferences,
222    GetProjectSymbols,
223    JoinProject,
224    JoinProjectRequestCancelled,
225    LeaveProject,
226    OpenBufferById,
227    OpenBufferByPath,
228    OpenBufferForSymbol,
229    PerformRename,
230    PrepareRename,
231    ProjectUnshared,
232    ReloadBuffers,
233    RemoveProjectCollaborator,
234    RequestJoinProject,
235    SaveBuffer,
236    SearchProject,
237    StartLanguageServer,
238    Unfollow,
239    UnregisterProject,
240    UnregisterWorktree,
241    UpdateBuffer,
242    UpdateBufferFile,
243    UpdateDiagnosticSummary,
244    UpdateFollowers,
245    UpdateLanguageServer,
246    RegisterWorktree,
247    UpdateWorktree,
248);
249
250entity_messages!(channel_id, ChannelMessageSent);
251
252/// A stream of protobuf messages.
253pub struct MessageStream<S> {
254    stream: S,
255    encoding_buffer: Vec<u8>,
256}
257
258#[derive(Debug)]
259pub enum Message {
260    Envelope(Envelope),
261    Ping,
262    Pong,
263}
264
265impl<S> MessageStream<S> {
266    pub fn new(stream: S) -> Self {
267        Self {
268            stream,
269            encoding_buffer: Vec::new(),
270        }
271    }
272
273    pub fn inner_mut(&mut self) -> &mut S {
274        &mut self.stream
275    }
276}
277
278impl<S> MessageStream<S>
279where
280    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
281{
282    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
283        #[cfg(any(test, feature = "test-support"))]
284        const COMPRESSION_LEVEL: i32 = -7;
285
286        #[cfg(not(any(test, feature = "test-support")))]
287        const COMPRESSION_LEVEL: i32 = 4;
288
289        match message {
290            Message::Envelope(message) => {
291                self.encoding_buffer.resize(message.encoded_len(), 0);
292                self.encoding_buffer.clear();
293                message
294                    .encode(&mut self.encoding_buffer)
295                    .map_err(|err| io::Error::from(err))?;
296                let buffer =
297                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
298                        .unwrap();
299                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
300            }
301            Message::Ping => {
302                self.stream
303                    .send(WebSocketMessage::Ping(Default::default()))
304                    .await?;
305            }
306            Message::Pong => {
307                self.stream
308                    .send(WebSocketMessage::Pong(Default::default()))
309                    .await?;
310            }
311        }
312
313        Ok(())
314    }
315}
316
317impl<S> MessageStream<S>
318where
319    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
320{
321    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
322        while let Some(bytes) = self.stream.next().await {
323            match bytes? {
324                WebSocketMessage::Binary(bytes) => {
325                    self.encoding_buffer.clear();
326                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
327                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
328                        .map_err(io::Error::from)?;
329                    return Ok(Message::Envelope(envelope));
330                }
331                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
332                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
333                WebSocketMessage::Close(_) => break,
334                _ => {}
335            }
336        }
337        Err(anyhow!("connection closed"))
338    }
339}
340
341impl Into<SystemTime> for Timestamp {
342    fn into(self) -> SystemTime {
343        UNIX_EPOCH
344            .checked_add(Duration::new(self.seconds, self.nanos))
345            .unwrap()
346    }
347}
348
349impl From<SystemTime> for Timestamp {
350    fn from(time: SystemTime) -> Self {
351        let duration = time.duration_since(UNIX_EPOCH).unwrap();
352        Self {
353            seconds: duration.as_secs(),
354            nanos: duration.subsec_nanos(),
355        }
356    }
357}
358
359impl From<u128> for Nonce {
360    fn from(nonce: u128) -> Self {
361        let upper_half = (nonce >> 64) as u64;
362        let lower_half = nonce as u64;
363        Self {
364            upper_half,
365            lower_half,
366        }
367    }
368}
369
370impl From<Nonce> for u128 {
371    fn from(nonce: Nonce) -> Self {
372        let upper_half = (nonce.upper_half as u128) << 64;
373        let lower_half = nonce.lower_half as u128;
374        upper_half | lower_half
375    }
376}