proto.rs

  1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use async_tungstenite::tungstenite::Message as WebSocketMessage;
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message as _;
  6use serde::Serialize;
  7use std::any::{Any, TypeId};
  8use std::{
  9    fmt::Debug,
 10    io,
 11    time::{Duration, SystemTime, UNIX_EPOCH},
 12};
 13
 14include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 15
 16pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 17    const NAME: &'static str;
 18    const PRIORITY: MessagePriority;
 19    fn into_envelope(
 20        self,
 21        id: u32,
 22        responding_to: Option<u32>,
 23        original_sender_id: Option<u32>,
 24    ) -> Envelope;
 25    fn from_envelope(envelope: Envelope) -> Option<Self>;
 26}
 27
 28pub trait EntityMessage: EnvelopedMessage {
 29    fn remote_entity_id(&self) -> u64;
 30}
 31
 32pub trait RequestMessage: EnvelopedMessage {
 33    type Response: EnvelopedMessage;
 34}
 35
 36pub trait AnyTypedEnvelope: 'static + Send + Sync {
 37    fn payload_type_id(&self) -> TypeId;
 38    fn payload_type_name(&self) -> &'static str;
 39    fn as_any(&self) -> &dyn Any;
 40    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 41    fn is_background(&self) -> bool;
 42    fn original_sender_id(&self) -> Option<PeerId>;
 43}
 44
 45pub enum MessagePriority {
 46    Foreground,
 47    Background,
 48}
 49
 50impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 51    fn payload_type_id(&self) -> TypeId {
 52        TypeId::of::<T>()
 53    }
 54
 55    fn payload_type_name(&self) -> &'static str {
 56        T::NAME
 57    }
 58
 59    fn as_any(&self) -> &dyn Any {
 60        self
 61    }
 62
 63    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 64        self
 65    }
 66
 67    fn is_background(&self) -> bool {
 68        matches!(T::PRIORITY, MessagePriority::Background)
 69    }
 70
 71    fn original_sender_id(&self) -> Option<PeerId> {
 72        self.original_sender_id
 73    }
 74}
 75
 76messages!(
 77    (Ack, Foreground),
 78    (AddProjectCollaborator, Foreground),
 79    (ApplyCodeAction, Background),
 80    (ApplyCodeActionResponse, Background),
 81    (ApplyCompletionAdditionalEdits, Background),
 82    (ApplyCompletionAdditionalEditsResponse, Background),
 83    (BufferReloaded, Foreground),
 84    (BufferSaved, Foreground),
 85    (RemoveContact, Foreground),
 86    (ChannelMessageSent, Foreground),
 87    (CreateProjectEntry, Foreground),
 88    (DeleteProjectEntry, Foreground),
 89    (Error, Foreground),
 90    (Follow, Foreground),
 91    (FollowResponse, Foreground),
 92    (FormatBuffers, Foreground),
 93    (FormatBuffersResponse, Foreground),
 94    (FuzzySearchUsers, Foreground),
 95    (GetChannelMessages, Foreground),
 96    (GetChannelMessagesResponse, Foreground),
 97    (GetChannels, Foreground),
 98    (GetChannelsResponse, Foreground),
 99    (GetCodeActions, Background),
100    (GetCodeActionsResponse, Background),
101    (GetCompletions, Background),
102    (GetCompletionsResponse, Background),
103    (GetDefinition, Background),
104    (GetDefinitionResponse, Background),
105    (GetDocumentHighlights, Background),
106    (GetDocumentHighlightsResponse, Background),
107    (GetReferences, Background),
108    (GetReferencesResponse, Background),
109    (GetProjectSymbols, Background),
110    (GetProjectSymbolsResponse, Background),
111    (GetUsers, Foreground),
112    (UsersResponse, Foreground),
113    (JoinChannel, Foreground),
114    (JoinChannelResponse, Foreground),
115    (JoinProject, Foreground),
116    (JoinProjectResponse, Foreground),
117    (JoinProjectRequestCancelled, Foreground),
118    (LeaveChannel, Foreground),
119    (LeaveProject, Foreground),
120    (OpenBufferById, Background),
121    (OpenBufferByPath, Background),
122    (OpenBufferForSymbol, Background),
123    (OpenBufferForSymbolResponse, Background),
124    (OpenBufferResponse, Background),
125    (PerformRename, Background),
126    (PerformRenameResponse, Background),
127    (PrepareRename, Background),
128    (PrepareRenameResponse, Background),
129    (ProjectEntryResponse, Foreground),
130    (RegisterProjectResponse, Foreground),
131    (Ping, Foreground),
132    (ProjectUnshared, Foreground),
133    (RegisterProject, Foreground),
134    (RegisterWorktree, Foreground),
135    (ReloadBuffers, Foreground),
136    (ReloadBuffersResponse, Foreground),
137    (RemoveProjectCollaborator, Foreground),
138    (RenameProjectEntry, Foreground),
139    (RequestContact, Foreground),
140    (RequestJoinProject, Foreground),
141    (RespondToContactRequest, Foreground),
142    (RespondToJoinProjectRequest, Foreground),
143    (SaveBuffer, Foreground),
144    (SearchProject, Background),
145    (SearchProjectResponse, Background),
146    (SendChannelMessage, Foreground),
147    (SendChannelMessageResponse, Foreground),
148    (ShowContacts, Foreground),
149    (StartLanguageServer, Foreground),
150    (Test, Foreground),
151    (Unfollow, Foreground),
152    (UnregisterProject, Foreground),
153    (UnregisterWorktree, Foreground),
154    (UpdateBuffer, Foreground),
155    (UpdateBufferFile, Foreground),
156    (UpdateContacts, Foreground),
157    (UpdateDiagnosticSummary, Foreground),
158    (UpdateFollowers, Foreground),
159    (UpdateInviteInfo, Foreground),
160    (UpdateLanguageServer, Foreground),
161    (UpdateWorktree, Foreground),
162);
163
164request_messages!(
165    (ApplyCodeAction, ApplyCodeActionResponse),
166    (
167        ApplyCompletionAdditionalEdits,
168        ApplyCompletionAdditionalEditsResponse
169    ),
170    (CreateProjectEntry, ProjectEntryResponse),
171    (DeleteProjectEntry, ProjectEntryResponse),
172    (Follow, FollowResponse),
173    (FormatBuffers, FormatBuffersResponse),
174    (GetChannelMessages, GetChannelMessagesResponse),
175    (GetChannels, GetChannelsResponse),
176    (GetCodeActions, GetCodeActionsResponse),
177    (GetCompletions, GetCompletionsResponse),
178    (GetDefinition, GetDefinitionResponse),
179    (GetDocumentHighlights, GetDocumentHighlightsResponse),
180    (GetReferences, GetReferencesResponse),
181    (GetProjectSymbols, GetProjectSymbolsResponse),
182    (FuzzySearchUsers, UsersResponse),
183    (GetUsers, UsersResponse),
184    (JoinChannel, JoinChannelResponse),
185    (JoinProject, JoinProjectResponse),
186    (OpenBufferById, OpenBufferResponse),
187    (OpenBufferByPath, OpenBufferResponse),
188    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
189    (Ping, Ack),
190    (PerformRename, PerformRenameResponse),
191    (PrepareRename, PrepareRenameResponse),
192    (RegisterProject, RegisterProjectResponse),
193    (RegisterWorktree, Ack),
194    (ReloadBuffers, ReloadBuffersResponse),
195    (RequestContact, Ack),
196    (RemoveContact, Ack),
197    (RespondToContactRequest, Ack),
198    (RenameProjectEntry, ProjectEntryResponse),
199    (SaveBuffer, BufferSaved),
200    (SearchProject, SearchProjectResponse),
201    (SendChannelMessage, SendChannelMessageResponse),
202    (Test, Test),
203    (UpdateBuffer, Ack),
204    (UpdateWorktree, Ack),
205);
206
207entity_messages!(
208    project_id,
209    AddProjectCollaborator,
210    ApplyCodeAction,
211    ApplyCompletionAdditionalEdits,
212    BufferReloaded,
213    BufferSaved,
214    CreateProjectEntry,
215    RenameProjectEntry,
216    DeleteProjectEntry,
217    Follow,
218    FormatBuffers,
219    GetCodeActions,
220    GetCompletions,
221    GetDefinition,
222    GetDocumentHighlights,
223    GetReferences,
224    GetProjectSymbols,
225    JoinProject,
226    JoinProjectRequestCancelled,
227    LeaveProject,
228    OpenBufferById,
229    OpenBufferByPath,
230    OpenBufferForSymbol,
231    PerformRename,
232    PrepareRename,
233    ProjectUnshared,
234    ReloadBuffers,
235    RemoveProjectCollaborator,
236    RequestJoinProject,
237    SaveBuffer,
238    SearchProject,
239    StartLanguageServer,
240    Unfollow,
241    UnregisterProject,
242    UnregisterWorktree,
243    UpdateBuffer,
244    UpdateBufferFile,
245    UpdateDiagnosticSummary,
246    UpdateFollowers,
247    UpdateLanguageServer,
248    RegisterWorktree,
249    UpdateWorktree,
250);
251
252entity_messages!(channel_id, ChannelMessageSent);
253
254/// A stream of protobuf messages.
255pub struct MessageStream<S> {
256    stream: S,
257    encoding_buffer: Vec<u8>,
258}
259
260#[derive(Debug)]
261pub enum Message {
262    Envelope(Envelope),
263    Ping,
264    Pong,
265}
266
267impl<S> MessageStream<S> {
268    pub fn new(stream: S) -> Self {
269        Self {
270            stream,
271            encoding_buffer: Vec::new(),
272        }
273    }
274
275    pub fn inner_mut(&mut self) -> &mut S {
276        &mut self.stream
277    }
278}
279
280impl<S> MessageStream<S>
281where
282    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
283{
284    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
285        #[cfg(any(test, feature = "test-support"))]
286        const COMPRESSION_LEVEL: i32 = -7;
287
288        #[cfg(not(any(test, feature = "test-support")))]
289        const COMPRESSION_LEVEL: i32 = 4;
290
291        match message {
292            Message::Envelope(message) => {
293                self.encoding_buffer.resize(message.encoded_len(), 0);
294                self.encoding_buffer.clear();
295                message
296                    .encode(&mut self.encoding_buffer)
297                    .map_err(|err| io::Error::from(err))?;
298                let buffer =
299                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
300                        .unwrap();
301                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
302            }
303            Message::Ping => {
304                self.stream
305                    .send(WebSocketMessage::Ping(Default::default()))
306                    .await?;
307            }
308            Message::Pong => {
309                self.stream
310                    .send(WebSocketMessage::Pong(Default::default()))
311                    .await?;
312            }
313        }
314
315        Ok(())
316    }
317}
318
319impl<S> MessageStream<S>
320where
321    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
322{
323    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
324        while let Some(bytes) = self.stream.next().await {
325            match bytes? {
326                WebSocketMessage::Binary(bytes) => {
327                    self.encoding_buffer.clear();
328                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
329                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
330                        .map_err(io::Error::from)?;
331                    return Ok(Message::Envelope(envelope));
332                }
333                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
334                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
335                WebSocketMessage::Close(_) => break,
336                _ => {}
337            }
338        }
339        Err(anyhow!("connection closed"))
340    }
341}
342
343impl Into<SystemTime> for Timestamp {
344    fn into(self) -> SystemTime {
345        UNIX_EPOCH
346            .checked_add(Duration::new(self.seconds, self.nanos))
347            .unwrap()
348    }
349}
350
351impl From<SystemTime> for Timestamp {
352    fn from(time: SystemTime) -> Self {
353        let duration = time.duration_since(UNIX_EPOCH).unwrap();
354        Self {
355            seconds: duration.as_secs(),
356            nanos: duration.subsec_nanos(),
357        }
358    }
359}
360
361impl From<u128> for Nonce {
362    fn from(nonce: u128) -> Self {
363        let upper_half = (nonce >> 64) as u64;
364        let lower_half = nonce as u64;
365        Self {
366            upper_half,
367            lower_half,
368        }
369    }
370}
371
372impl From<Nonce> for u128 {
373    fn from(nonce: Nonce) -> Self {
374        let upper_half = (nonce.upper_half as u128) << 64;
375        let lower_half = nonce.lower_half as u128;
376        upper_half | lower_half
377    }
378}