proto.rs

  1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use async_tungstenite::tungstenite::Message as WebSocketMessage;
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message as _;
  6use serde::Serialize;
  7use std::any::{Any, TypeId};
  8use std::{
  9    fmt::Debug,
 10    io,
 11    time::{Duration, SystemTime, UNIX_EPOCH},
 12};
 13
 14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 15
 16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 17    const NAME: &'static str;
 18    const PRIORITY: MessagePriority;
 19    fn into_envelope(
 20        self,
 21        id: u32,
 22        responding_to: Option<u32>,
 23        original_sender_id: Option<u32>,
 24    ) -> Envelope;
 25    fn from_envelope(envelope: Envelope) -> Option<Self>;
 26}
 27
 28pub trait EntityMessage: EnvelopedMessage {
 29    fn remote_entity_id(&self) -> u64;
 30}
 31
 32pub trait RequestMessage: EnvelopedMessage {
 33    type Response: EnvelopedMessage;
 34}
 35
 36pub trait AnyTypedEnvelope: 'static + Send + Sync {
 37    fn payload_type_id(&self) -> TypeId;
 38    fn payload_type_name(&self) -> &'static str;
 39    fn as_any(&self) -> &dyn Any;
 40    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 41    fn is_background(&self) -> bool;
 42    fn original_sender_id(&self) -> Option<PeerId>;
 43}
 44
 45pub enum MessagePriority {
 46    Foreground,
 47    Background,
 48}
 49
 50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 51    fn payload_type_id(&self) -> TypeId {
 52        TypeId::of::<T>()
 53    }
 54
 55    fn payload_type_name(&self) -> &'static str {
 56        T::NAME
 57    }
 58
 59    fn as_any(&self) -> &dyn Any {
 60        self
 61    }
 62
 63    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 64        self
 65    }
 66
 67    fn is_background(&self) -> bool {
 68        matches!(T::PRIORITY, MessagePriority::Background)
 69    }
 70
 71    fn original_sender_id(&self) -> Option<PeerId> {
 72        self.original_sender_id
 73    }
 74}
 75
 76messages!(
 77    (Ack, Foreground),
 78    (AddProjectCollaborator, Foreground),
 79    (ApplyCodeAction, Background),
 80    (ApplyCodeActionResponse, Background),
 81    (ApplyCompletionAdditionalEdits, Background),
 82    (ApplyCompletionAdditionalEditsResponse, Background),
 83    (BufferReloaded, Foreground),
 84    (BufferSaved, Foreground),
 85    (RemoveContact, Foreground),
 86    (ChannelMessageSent, Foreground),
 87    (CreateProjectEntry, Foreground),
 88    (DeleteProjectEntry, Foreground),
 89    (Error, Foreground),
 90    (Follow, Foreground),
 91    (FollowResponse, Foreground),
 92    (FormatBuffers, Foreground),
 93    (FormatBuffersResponse, Foreground),
 94    (FuzzySearchUsers, Foreground),
 95    (GetChannelMessages, Foreground),
 96    (GetChannelMessagesResponse, Foreground),
 97    (GetChannels, Foreground),
 98    (GetChannelsResponse, Foreground),
 99    (GetCodeActions, Background),
100    (GetCodeActionsResponse, Background),
101    (GetCompletions, Background),
102    (GetCompletionsResponse, Background),
103    (GetDefinition, Background),
104    (GetDefinitionResponse, Background),
105    (GetDocumentHighlights, Background),
106    (GetDocumentHighlightsResponse, Background),
107    (GetReferences, Background),
108    (GetReferencesResponse, Background),
109    (GetProjectSymbols, Background),
110    (GetProjectSymbolsResponse, Background),
111    (GetUsers, Foreground),
112    (UsersResponse, Foreground),
113    (JoinChannel, Foreground),
114    (JoinChannelResponse, Foreground),
115    (JoinProject, Foreground),
116    (JoinProjectResponse, Foreground),
117    (LeaveChannel, Foreground),
118    (LeaveProject, Foreground),
119    (OpenBufferById, Background),
120    (OpenBufferByPath, Background),
121    (OpenBufferForSymbol, Background),
122    (OpenBufferForSymbolResponse, Background),
123    (OpenBufferResponse, Background),
124    (PerformRename, Background),
125    (PerformRenameResponse, Background),
126    (PrepareRename, Background),
127    (PrepareRenameResponse, Background),
128    (ProjectEntryResponse, Foreground),
129    (RegisterProjectResponse, Foreground),
130    (Ping, Foreground),
131    (ProjectUnshared, Foreground),
132    (RegisterProject, Foreground),
133    (RegisterWorktree, Foreground),
134    (ReloadBuffers, Foreground),
135    (ReloadBuffersResponse, Foreground),
136    (RemoveProjectCollaborator, Foreground),
137    (RenameProjectEntry, Foreground),
138    (RequestContact, Foreground),
139    (RequestJoinProject, Foreground),
140    (RespondToContactRequest, Foreground),
141    (RespondToJoinProjectRequest, Foreground),
142    (SaveBuffer, Foreground),
143    (SearchProject, Background),
144    (SearchProjectResponse, Background),
145    (SendChannelMessage, Foreground),
146    (SendChannelMessageResponse, Foreground),
147    (StartLanguageServer, Foreground),
148    (Test, Foreground),
149    (Unfollow, Foreground),
150    (UnregisterProject, Foreground),
151    (UnregisterWorktree, Foreground),
152    (UpdateBuffer, Foreground),
153    (UpdateBufferFile, Foreground),
154    (UpdateContacts, Foreground),
155    (UpdateDiagnosticSummary, Foreground),
156    (UpdateFollowers, Foreground),
157    (UpdateLanguageServer, Foreground),
158    (UpdateWorktree, Foreground),
159);
160
161request_messages!(
162    (ApplyCodeAction, ApplyCodeActionResponse),
163    (
164        ApplyCompletionAdditionalEdits,
165        ApplyCompletionAdditionalEditsResponse
166    ),
167    (CreateProjectEntry, ProjectEntryResponse),
168    (DeleteProjectEntry, ProjectEntryResponse),
169    (Follow, FollowResponse),
170    (FormatBuffers, FormatBuffersResponse),
171    (GetChannelMessages, GetChannelMessagesResponse),
172    (GetChannels, GetChannelsResponse),
173    (GetCodeActions, GetCodeActionsResponse),
174    (GetCompletions, GetCompletionsResponse),
175    (GetDefinition, GetDefinitionResponse),
176    (GetDocumentHighlights, GetDocumentHighlightsResponse),
177    (GetReferences, GetReferencesResponse),
178    (GetProjectSymbols, GetProjectSymbolsResponse),
179    (FuzzySearchUsers, UsersResponse),
180    (GetUsers, UsersResponse),
181    (JoinChannel, JoinChannelResponse),
182    (JoinProject, JoinProjectResponse),
183    (OpenBufferById, OpenBufferResponse),
184    (OpenBufferByPath, OpenBufferResponse),
185    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
186    (Ping, Ack),
187    (PerformRename, PerformRenameResponse),
188    (PrepareRename, PrepareRenameResponse),
189    (RegisterProject, RegisterProjectResponse),
190    (RegisterWorktree, Ack),
191    (ReloadBuffers, ReloadBuffersResponse),
192    (RequestContact, Ack),
193    (RemoveContact, Ack),
194    (RespondToContactRequest, Ack),
195    (RenameProjectEntry, ProjectEntryResponse),
196    (SaveBuffer, BufferSaved),
197    (SearchProject, SearchProjectResponse),
198    (SendChannelMessage, SendChannelMessageResponse),
199    (Test, Test),
200    (UpdateBuffer, Ack),
201    (UpdateWorktree, Ack),
202);
203
204entity_messages!(
205    project_id,
206    AddProjectCollaborator,
207    ApplyCodeAction,
208    ApplyCompletionAdditionalEdits,
209    BufferReloaded,
210    BufferSaved,
211    CreateProjectEntry,
212    RenameProjectEntry,
213    DeleteProjectEntry,
214    Follow,
215    FormatBuffers,
216    GetCodeActions,
217    GetCompletions,
218    GetDefinition,
219    GetDocumentHighlights,
220    GetReferences,
221    GetProjectSymbols,
222    JoinProject,
223    LeaveProject,
224    OpenBufferById,
225    OpenBufferByPath,
226    OpenBufferForSymbol,
227    PerformRename,
228    PrepareRename,
229    ProjectUnshared,
230    ReloadBuffers,
231    RemoveProjectCollaborator,
232    RequestJoinProject,
233    SaveBuffer,
234    SearchProject,
235    StartLanguageServer,
236    Unfollow,
237    UnregisterProject,
238    UnregisterWorktree,
239    UpdateBuffer,
240    UpdateBufferFile,
241    UpdateDiagnosticSummary,
242    UpdateFollowers,
243    UpdateLanguageServer,
244    RegisterWorktree,
245    UpdateWorktree,
246);
247
248entity_messages!(channel_id, ChannelMessageSent);
249
250/// A stream of protobuf messages.
251pub struct MessageStream<S> {
252    stream: S,
253    encoding_buffer: Vec<u8>,
254}
255
256#[derive(Debug)]
257pub enum Message {
258    Envelope(Envelope),
259    Ping,
260    Pong,
261}
262
263impl<S> MessageStream<S> {
264    pub fn new(stream: S) -> Self {
265        Self {
266            stream,
267            encoding_buffer: Vec::new(),
268        }
269    }
270
271    pub fn inner_mut(&mut self) -> &mut S {
272        &mut self.stream
273    }
274}
275
276impl<S> MessageStream<S>
277where
278    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
279{
280    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
281        #[cfg(any(test, feature = "test-support"))]
282        const COMPRESSION_LEVEL: i32 = -7;
283
284        #[cfg(not(any(test, feature = "test-support")))]
285        const COMPRESSION_LEVEL: i32 = 4;
286
287        match message {
288            Message::Envelope(message) => {
289                self.encoding_buffer.resize(message.encoded_len(), 0);
290                self.encoding_buffer.clear();
291                message
292                    .encode(&mut self.encoding_buffer)
293                    .map_err(|err| io::Error::from(err))?;
294                let buffer =
295                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
296                        .unwrap();
297                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
298            }
299            Message::Ping => {
300                self.stream
301                    .send(WebSocketMessage::Ping(Default::default()))
302                    .await?;
303            }
304            Message::Pong => {
305                self.stream
306                    .send(WebSocketMessage::Pong(Default::default()))
307                    .await?;
308            }
309        }
310
311        Ok(())
312    }
313}
314
315impl<S> MessageStream<S>
316where
317    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
318{
319    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
320        while let Some(bytes) = self.stream.next().await {
321            match bytes? {
322                WebSocketMessage::Binary(bytes) => {
323                    self.encoding_buffer.clear();
324                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
325                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
326                        .map_err(io::Error::from)?;
327                    return Ok(Message::Envelope(envelope));
328                }
329                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
330                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
331                WebSocketMessage::Close(_) => break,
332                _ => {}
333            }
334        }
335        Err(anyhow!("connection closed"))
336    }
337}
338
339impl Into<SystemTime> for Timestamp {
340    fn into(self) -> SystemTime {
341        UNIX_EPOCH
342            .checked_add(Duration::new(self.seconds, self.nanos))
343            .unwrap()
344    }
345}
346
347impl From<SystemTime> for Timestamp {
348    fn from(time: SystemTime) -> Self {
349        let duration = time.duration_since(UNIX_EPOCH).unwrap();
350        Self {
351            seconds: duration.as_secs(),
352            nanos: duration.subsec_nanos(),
353        }
354    }
355}
356
357impl From<u128> for Nonce {
358    fn from(nonce: u128) -> Self {
359        let upper_half = (nonce >> 64) as u64;
360        let lower_half = nonce as u64;
361        Self {
362            upper_half,
363            lower_half,
364        }
365    }
366}
367
368impl From<Nonce> for u128 {
369    fn from(nonce: Nonce) -> Self {
370        let upper_half = (nonce.upper_half as u128) << 64;
371        let lower_half = nonce.lower_half as u128;
372        upper_half | lower_half
373    }
374}