proto.rs

  1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use async_tungstenite::tungstenite::Message as WebSocketMessage;
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message as _;
  6use serde::Serialize;
  7use std::any::{Any, TypeId};
  8use std::{
  9    fmt::Debug,
 10    io,
 11    time::{Duration, SystemTime, UNIX_EPOCH},
 12};
 13
 14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 15
 16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 17    const NAME: &'static str;
 18    const PRIORITY: MessagePriority;
 19    fn into_envelope(
 20        self,
 21        id: u32,
 22        responding_to: Option<u32>,
 23        original_sender_id: Option<u32>,
 24    ) -> Envelope;
 25    fn from_envelope(envelope: Envelope) -> Option<Self>;
 26}
 27
 28pub trait EntityMessage: EnvelopedMessage {
 29    fn remote_entity_id(&self) -> u64;
 30}
 31
 32pub trait RequestMessage: EnvelopedMessage {
 33    type Response: EnvelopedMessage;
 34}
 35
 36pub trait AnyTypedEnvelope: 'static + Send + Sync {
 37    fn payload_type_id(&self) -> TypeId;
 38    fn payload_type_name(&self) -> &'static str;
 39    fn as_any(&self) -> &dyn Any;
 40    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 41    fn is_background(&self) -> bool;
 42    fn original_sender_id(&self) -> Option<PeerId>;
 43}
 44
 45pub enum MessagePriority {
 46    Foreground,
 47    Background,
 48}
 49
 50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 51    fn payload_type_id(&self) -> TypeId {
 52        TypeId::of::<T>()
 53    }
 54
 55    fn payload_type_name(&self) -> &'static str {
 56        T::NAME
 57    }
 58
 59    fn as_any(&self) -> &dyn Any {
 60        self
 61    }
 62
 63    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 64        self
 65    }
 66
 67    fn is_background(&self) -> bool {
 68        matches!(T::PRIORITY, MessagePriority::Background)
 69    }
 70
 71    fn original_sender_id(&self) -> Option<PeerId> {
 72        self.original_sender_id
 73    }
 74}
 75
 76messages!(
 77    (Ack, Foreground),
 78    (AddProjectCollaborator, Foreground),
 79    (ApplyCodeAction, Background),
 80    (ApplyCodeActionResponse, Background),
 81    (ApplyCompletionAdditionalEdits, Background),
 82    (ApplyCompletionAdditionalEditsResponse, Background),
 83    (BufferReloaded, Foreground),
 84    (BufferSaved, Foreground),
 85    (ChannelMessageSent, Foreground),
 86    (CreateProjectEntry, Foreground),
 87    (DeleteProjectEntry, Foreground),
 88    (Error, Foreground),
 89    (Follow, Foreground),
 90    (FollowResponse, Foreground),
 91    (FormatBuffers, Foreground),
 92    (FormatBuffersResponse, Foreground),
 93    (FuzzySearchUsers, Foreground),
 94    (GetChannelMessages, Foreground),
 95    (GetChannelMessagesResponse, Foreground),
 96    (GetChannels, Foreground),
 97    (GetChannelsResponse, Foreground),
 98    (GetCodeActions, Background),
 99    (GetCodeActionsResponse, Background),
100    (GetCompletions, Background),
101    (GetCompletionsResponse, Background),
102    (GetDefinition, Background),
103    (GetDefinitionResponse, Background),
104    (GetDocumentHighlights, Background),
105    (GetDocumentHighlightsResponse, Background),
106    (GetReferences, Background),
107    (GetReferencesResponse, Background),
108    (GetProjectSymbols, Background),
109    (GetProjectSymbolsResponse, Background),
110    (GetUsers, Foreground),
111    (UsersResponse, Foreground),
112    (JoinChannel, Foreground),
113    (JoinChannelResponse, Foreground),
114    (JoinProject, Foreground),
115    (JoinProjectResponse, Foreground),
116    (LeaveChannel, Foreground),
117    (LeaveProject, Foreground),
118    (OpenBufferById, Background),
119    (OpenBufferByPath, Background),
120    (OpenBufferForSymbol, Background),
121    (OpenBufferForSymbolResponse, Background),
122    (OpenBufferResponse, Background),
123    (PerformRename, Background),
124    (PerformRenameResponse, Background),
125    (PrepareRename, Background),
126    (PrepareRenameResponse, Background),
127    (ProjectEntryResponse, Foreground),
128    (RegisterProjectResponse, Foreground),
129    (Ping, Foreground),
130    (RegisterProject, Foreground),
131    (RegisterWorktree, Foreground),
132    (ReloadBuffers, Foreground),
133    (ReloadBuffersResponse, Foreground),
134    (RemoveProjectCollaborator, Foreground),
135    (RenameProjectEntry, Foreground),
136    (RequestContact, Foreground),
137    (RespondToContactRequest, Foreground),
138    (SaveBuffer, Foreground),
139    (SearchProject, Background),
140    (SearchProjectResponse, Background),
141    (SendChannelMessage, Foreground),
142    (SendChannelMessageResponse, Foreground),
143    (ShareProject, Foreground),
144    (StartLanguageServer, Foreground),
145    (Test, Foreground),
146    (Unfollow, Foreground),
147    (UnregisterProject, Foreground),
148    (UnregisterWorktree, Foreground),
149    (UnshareProject, Foreground),
150    (UpdateBuffer, Foreground),
151    (UpdateBufferFile, Foreground),
152    (UpdateContacts, Foreground),
153    (UpdateDiagnosticSummary, Foreground),
154    (UpdateFollowers, Foreground),
155    (UpdateLanguageServer, Foreground),
156    (UpdateWorktree, Foreground),
157);
158
159request_messages!(
160    (ApplyCodeAction, ApplyCodeActionResponse),
161    (
162        ApplyCompletionAdditionalEdits,
163        ApplyCompletionAdditionalEditsResponse
164    ),
165    (CreateProjectEntry, ProjectEntryResponse),
166    (DeleteProjectEntry, ProjectEntryResponse),
167    (Follow, FollowResponse),
168    (FormatBuffers, FormatBuffersResponse),
169    (GetChannelMessages, GetChannelMessagesResponse),
170    (GetChannels, GetChannelsResponse),
171    (GetCodeActions, GetCodeActionsResponse),
172    (GetCompletions, GetCompletionsResponse),
173    (GetDefinition, GetDefinitionResponse),
174    (GetDocumentHighlights, GetDocumentHighlightsResponse),
175    (GetReferences, GetReferencesResponse),
176    (GetProjectSymbols, GetProjectSymbolsResponse),
177    (FuzzySearchUsers, UsersResponse),
178    (GetUsers, UsersResponse),
179    (JoinChannel, JoinChannelResponse),
180    (JoinProject, JoinProjectResponse),
181    (OpenBufferById, OpenBufferResponse),
182    (OpenBufferByPath, OpenBufferResponse),
183    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
184    (Ping, Ack),
185    (PerformRename, PerformRenameResponse),
186    (PrepareRename, PrepareRenameResponse),
187    (RegisterProject, RegisterProjectResponse),
188    (RegisterWorktree, Ack),
189    (ReloadBuffers, ReloadBuffersResponse),
190    (RequestContact, Ack),
191    (RespondToContactRequest, Ack),
192    (RenameProjectEntry, ProjectEntryResponse),
193    (SaveBuffer, BufferSaved),
194    (SearchProject, SearchProjectResponse),
195    (SendChannelMessage, SendChannelMessageResponse),
196    (ShareProject, Ack),
197    (Test, Test),
198    (UpdateBuffer, Ack),
199    (UpdateWorktree, Ack),
200);
201
202entity_messages!(
203    project_id,
204    AddProjectCollaborator,
205    ApplyCodeAction,
206    ApplyCompletionAdditionalEdits,
207    BufferReloaded,
208    BufferSaved,
209    CreateProjectEntry,
210    RenameProjectEntry,
211    DeleteProjectEntry,
212    Follow,
213    FormatBuffers,
214    GetCodeActions,
215    GetCompletions,
216    GetDefinition,
217    GetDocumentHighlights,
218    GetReferences,
219    GetProjectSymbols,
220    JoinProject,
221    LeaveProject,
222    OpenBufferById,
223    OpenBufferByPath,
224    OpenBufferForSymbol,
225    PerformRename,
226    PrepareRename,
227    ReloadBuffers,
228    RemoveProjectCollaborator,
229    SaveBuffer,
230    SearchProject,
231    StartLanguageServer,
232    Unfollow,
233    UnregisterWorktree,
234    UnshareProject,
235    UpdateBuffer,
236    UpdateBufferFile,
237    UpdateDiagnosticSummary,
238    UpdateFollowers,
239    UpdateLanguageServer,
240    RegisterWorktree,
241    UpdateWorktree,
242);
243
244entity_messages!(channel_id, ChannelMessageSent);
245
246/// A stream of protobuf messages.
247pub struct MessageStream<S> {
248    stream: S,
249    encoding_buffer: Vec<u8>,
250}
251
252#[derive(Debug)]
253pub enum Message {
254    Envelope(Envelope),
255    Ping,
256    Pong,
257}
258
259impl<S> MessageStream<S> {
260    pub fn new(stream: S) -> Self {
261        Self {
262            stream,
263            encoding_buffer: Vec::new(),
264        }
265    }
266
267    pub fn inner_mut(&mut self) -> &mut S {
268        &mut self.stream
269    }
270}
271
272impl<S> MessageStream<S>
273where
274    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
275{
276    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
277        #[cfg(any(test, feature = "test-support"))]
278        const COMPRESSION_LEVEL: i32 = -7;
279
280        #[cfg(not(any(test, feature = "test-support")))]
281        const COMPRESSION_LEVEL: i32 = 4;
282
283        match message {
284            Message::Envelope(message) => {
285                self.encoding_buffer.resize(message.encoded_len(), 0);
286                self.encoding_buffer.clear();
287                message
288                    .encode(&mut self.encoding_buffer)
289                    .map_err(|err| io::Error::from(err))?;
290                let buffer =
291                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
292                        .unwrap();
293                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
294            }
295            Message::Ping => {
296                self.stream
297                    .send(WebSocketMessage::Ping(Default::default()))
298                    .await?;
299            }
300            Message::Pong => {
301                self.stream
302                    .send(WebSocketMessage::Pong(Default::default()))
303                    .await?;
304            }
305        }
306
307        Ok(())
308    }
309}
310
311impl<S> MessageStream<S>
312where
313    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
314{
315    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
316        while let Some(bytes) = self.stream.next().await {
317            match bytes? {
318                WebSocketMessage::Binary(bytes) => {
319                    self.encoding_buffer.clear();
320                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
321                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
322                        .map_err(io::Error::from)?;
323                    return Ok(Message::Envelope(envelope));
324                }
325                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
326                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
327                WebSocketMessage::Close(_) => break,
328                _ => {}
329            }
330        }
331        Err(anyhow!("connection closed"))
332    }
333}
334
335impl Into<SystemTime> for Timestamp {
336    fn into(self) -> SystemTime {
337        UNIX_EPOCH
338            .checked_add(Duration::new(self.seconds, self.nanos))
339            .unwrap()
340    }
341}
342
343impl From<SystemTime> for Timestamp {
344    fn from(time: SystemTime) -> Self {
345        let duration = time.duration_since(UNIX_EPOCH).unwrap();
346        Self {
347            seconds: duration.as_secs(),
348            nanos: duration.subsec_nanos(),
349        }
350    }
351}
352
353impl From<u128> for Nonce {
354    fn from(nonce: u128) -> Self {
355        let upper_half = (nonce >> 64) as u64;
356        let lower_half = nonce as u64;
357        Self {
358            upper_half,
359            lower_half,
360        }
361    }
362}
363
364impl From<Nonce> for u128 {
365    fn from(nonce: Nonce) -> Self {
366        let upper_half = (nonce.upper_half as u128) << 64;
367        let lower_half = nonce.lower_half as u128;
368        upper_half | lower_half
369    }
370}