proto.rs

  1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use async_tungstenite::tungstenite::Message as WebSocketMessage;
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message as _;
  6use serde::Serialize;
  7use std::any::{Any, TypeId};
  8use std::{cmp, iter, mem};
  9use std::{
 10    fmt::Debug,
 11    io,
 12    time::{Duration, SystemTime, UNIX_EPOCH},
 13};
 14
 15include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 16
 17pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 18    const NAME: &'static str;
 19    const PRIORITY: MessagePriority;
 20    fn into_envelope(
 21        self,
 22        id: u32,
 23        responding_to: Option<u32>,
 24        original_sender_id: Option<u32>,
 25    ) -> Envelope;
 26    fn from_envelope(envelope: Envelope) -> Option<Self>;
 27}
 28
 29pub trait EntityMessage: EnvelopedMessage {
 30    fn remote_entity_id(&self) -> u64;
 31}
 32
 33pub trait RequestMessage: EnvelopedMessage {
 34    type Response: EnvelopedMessage;
 35}
 36
 37pub trait AnyTypedEnvelope: 'static + Send + Sync {
 38    fn payload_type_id(&self) -> TypeId;
 39    fn payload_type_name(&self) -> &'static str;
 40    fn as_any(&self) -> &dyn Any;
 41    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 42    fn is_background(&self) -> bool;
 43    fn original_sender_id(&self) -> Option<PeerId>;
 44}
 45
 46pub enum MessagePriority {
 47    Foreground,
 48    Background,
 49}
 50
 51impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 52    fn payload_type_id(&self) -> TypeId {
 53        TypeId::of::<T>()
 54    }
 55
 56    fn payload_type_name(&self) -> &'static str {
 57        T::NAME
 58    }
 59
 60    fn as_any(&self) -> &dyn Any {
 61        self
 62    }
 63
 64    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 65        self
 66    }
 67
 68    fn is_background(&self) -> bool {
 69        matches!(T::PRIORITY, MessagePriority::Background)
 70    }
 71
 72    fn original_sender_id(&self) -> Option<PeerId> {
 73        self.original_sender_id
 74    }
 75}
 76
 77messages!(
 78    (Ack, Foreground),
 79    (AddProjectCollaborator, Foreground),
 80    (ApplyCodeAction, Background),
 81    (ApplyCodeActionResponse, Background),
 82    (ApplyCompletionAdditionalEdits, Background),
 83    (ApplyCompletionAdditionalEditsResponse, Background),
 84    (BufferReloaded, Foreground),
 85    (BufferSaved, Foreground),
 86    (RemoveContact, Foreground),
 87    (ChannelMessageSent, Foreground),
 88    (CopyProjectEntry, Foreground),
 89    (CreateProjectEntry, Foreground),
 90    (DeleteProjectEntry, Foreground),
 91    (Error, Foreground),
 92    (Follow, Foreground),
 93    (FollowResponse, Foreground),
 94    (FormatBuffers, Foreground),
 95    (FormatBuffersResponse, Foreground),
 96    (FuzzySearchUsers, Foreground),
 97    (GetChannelMessages, Foreground),
 98    (GetChannelMessagesResponse, Foreground),
 99    (GetChannels, Foreground),
100    (GetChannelsResponse, Foreground),
101    (GetCodeActions, Background),
102    (GetCodeActionsResponse, Background),
103    (GetHover, Background),
104    (GetHoverResponse, Background),
105    (GetCompletions, Background),
106    (GetCompletionsResponse, Background),
107    (GetDefinition, Background),
108    (GetDefinitionResponse, Background),
109    (GetDocumentHighlights, Background),
110    (GetDocumentHighlightsResponse, Background),
111    (GetReferences, Background),
112    (GetReferencesResponse, Background),
113    (GetProjectSymbols, Background),
114    (GetProjectSymbolsResponse, Background),
115    (GetUsers, Foreground),
116    (UsersResponse, Foreground),
117    (JoinChannel, Foreground),
118    (JoinChannelResponse, Foreground),
119    (JoinProject, Foreground),
120    (JoinProjectResponse, Foreground),
121    (JoinProjectRequestCancelled, Foreground),
122    (LeaveChannel, Foreground),
123    (LeaveProject, Foreground),
124    (OpenBufferById, Background),
125    (OpenBufferByPath, Background),
126    (OpenBufferForSymbol, Background),
127    (OpenBufferForSymbolResponse, Background),
128    (OpenBufferResponse, Background),
129    (PerformRename, Background),
130    (PerformRenameResponse, Background),
131    (PrepareRename, Background),
132    (PrepareRenameResponse, Background),
133    (ProjectEntryResponse, Foreground),
134    (ProjectUnshared, Foreground),
135    (RegisterProjectResponse, Foreground),
136    (Ping, Foreground),
137    (RegisterProject, Foreground),
138    (RegisterProjectActivity, Foreground),
139    (ReloadBuffers, Foreground),
140    (ReloadBuffersResponse, Foreground),
141    (RemoveProjectCollaborator, Foreground),
142    (RenameProjectEntry, Foreground),
143    (RequestContact, Foreground),
144    (RequestJoinProject, Foreground),
145    (RespondToContactRequest, Foreground),
146    (RespondToJoinProjectRequest, Foreground),
147    (SaveBuffer, Foreground),
148    (SearchProject, Background),
149    (SearchProjectResponse, Background),
150    (SendChannelMessage, Foreground),
151    (SendChannelMessageResponse, Foreground),
152    (ShowContacts, Foreground),
153    (StartLanguageServer, Foreground),
154    (Test, Foreground),
155    (Unfollow, Foreground),
156    (UnregisterProject, Foreground),
157    (UpdateBuffer, Foreground),
158    (UpdateBufferFile, Foreground),
159    (UpdateContacts, Foreground),
160    (UpdateDiagnosticSummary, Foreground),
161    (UpdateFollowers, Foreground),
162    (UpdateInviteInfo, Foreground),
163    (UpdateLanguageServer, Foreground),
164    (UpdateProject, Foreground),
165    (UpdateWorktree, Foreground),
166    (UpdateWorktreeExtensions, Background),
167);
168
169request_messages!(
170    (ApplyCodeAction, ApplyCodeActionResponse),
171    (
172        ApplyCompletionAdditionalEdits,
173        ApplyCompletionAdditionalEditsResponse
174    ),
175    (CopyProjectEntry, ProjectEntryResponse),
176    (CreateProjectEntry, ProjectEntryResponse),
177    (DeleteProjectEntry, ProjectEntryResponse),
178    (Follow, FollowResponse),
179    (FormatBuffers, FormatBuffersResponse),
180    (GetChannelMessages, GetChannelMessagesResponse),
181    (GetChannels, GetChannelsResponse),
182    (GetCodeActions, GetCodeActionsResponse),
183    (GetHover, GetHoverResponse),
184    (GetCompletions, GetCompletionsResponse),
185    (GetDefinition, GetDefinitionResponse),
186    (GetDocumentHighlights, GetDocumentHighlightsResponse),
187    (GetReferences, GetReferencesResponse),
188    (GetProjectSymbols, GetProjectSymbolsResponse),
189    (FuzzySearchUsers, UsersResponse),
190    (GetUsers, UsersResponse),
191    (JoinChannel, JoinChannelResponse),
192    (JoinProject, JoinProjectResponse),
193    (OpenBufferById, OpenBufferResponse),
194    (OpenBufferByPath, OpenBufferResponse),
195    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
196    (Ping, Ack),
197    (PerformRename, PerformRenameResponse),
198    (PrepareRename, PrepareRenameResponse),
199    (RegisterProject, RegisterProjectResponse),
200    (ReloadBuffers, ReloadBuffersResponse),
201    (RequestContact, Ack),
202    (RemoveContact, Ack),
203    (RespondToContactRequest, Ack),
204    (RenameProjectEntry, ProjectEntryResponse),
205    (SaveBuffer, BufferSaved),
206    (SearchProject, SearchProjectResponse),
207    (SendChannelMessage, SendChannelMessageResponse),
208    (Test, Test),
209    (UnregisterProject, Ack),
210    (UpdateBuffer, Ack),
211    (UpdateWorktree, Ack),
212);
213
214entity_messages!(
215    project_id,
216    AddProjectCollaborator,
217    ApplyCodeAction,
218    ApplyCompletionAdditionalEdits,
219    BufferReloaded,
220    BufferSaved,
221    CopyProjectEntry,
222    CreateProjectEntry,
223    DeleteProjectEntry,
224    Follow,
225    FormatBuffers,
226    GetCodeActions,
227    GetCompletions,
228    GetDefinition,
229    GetDocumentHighlights,
230    GetHover,
231    GetReferences,
232    GetProjectSymbols,
233    JoinProject,
234    JoinProjectRequestCancelled,
235    LeaveProject,
236    OpenBufferById,
237    OpenBufferByPath,
238    OpenBufferForSymbol,
239    PerformRename,
240    PrepareRename,
241    ProjectUnshared,
242    RegisterProjectActivity,
243    ReloadBuffers,
244    RemoveProjectCollaborator,
245    RenameProjectEntry,
246    RequestJoinProject,
247    SaveBuffer,
248    SearchProject,
249    StartLanguageServer,
250    Unfollow,
251    UnregisterProject,
252    UpdateBuffer,
253    UpdateBufferFile,
254    UpdateDiagnosticSummary,
255    UpdateFollowers,
256    UpdateLanguageServer,
257    UpdateProject,
258    UpdateWorktree,
259    UpdateWorktreeExtensions,
260);
261
262entity_messages!(channel_id, ChannelMessageSent);
263
264const MAX_BUFFER_LEN: usize = 1 * 1024 * 1024;
265
266/// A stream of protobuf messages.
267pub struct MessageStream<S> {
268    stream: S,
269    encoding_buffer: Vec<u8>,
270}
271
272#[derive(Debug)]
273pub enum Message {
274    Envelope(Envelope),
275    Ping,
276    Pong,
277}
278
279impl<S> MessageStream<S> {
280    pub fn new(stream: S) -> Self {
281        Self {
282            stream,
283            encoding_buffer: Vec::new(),
284        }
285    }
286
287    pub fn inner_mut(&mut self) -> &mut S {
288        &mut self.stream
289    }
290}
291
292impl<S> MessageStream<S>
293where
294    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
295{
296    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
297        #[cfg(any(test, feature = "test-support"))]
298        const COMPRESSION_LEVEL: i32 = -7;
299
300        #[cfg(not(any(test, feature = "test-support")))]
301        const COMPRESSION_LEVEL: i32 = 4;
302
303        match message {
304            Message::Envelope(message) => {
305                self.encoding_buffer.reserve(message.encoded_len());
306                message
307                    .encode(&mut self.encoding_buffer)
308                    .map_err(|err| io::Error::from(err))?;
309                let buffer =
310                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
311                        .unwrap();
312
313                self.encoding_buffer.clear();
314                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
315                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
316            }
317            Message::Ping => {
318                self.stream
319                    .send(WebSocketMessage::Ping(Default::default()))
320                    .await?;
321            }
322            Message::Pong => {
323                self.stream
324                    .send(WebSocketMessage::Pong(Default::default()))
325                    .await?;
326            }
327        }
328
329        Ok(())
330    }
331}
332
333impl<S> MessageStream<S>
334where
335    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
336{
337    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
338        while let Some(bytes) = self.stream.next().await {
339            match bytes? {
340                WebSocketMessage::Binary(bytes) => {
341                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
342                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
343                        .map_err(io::Error::from)?;
344
345                    self.encoding_buffer.clear();
346                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
347                    return Ok(Message::Envelope(envelope));
348                }
349                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
350                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
351                WebSocketMessage::Close(_) => break,
352                _ => {}
353            }
354        }
355        Err(anyhow!("connection closed"))
356    }
357}
358
359impl Into<SystemTime> for Timestamp {
360    fn into(self) -> SystemTime {
361        UNIX_EPOCH
362            .checked_add(Duration::new(self.seconds, self.nanos))
363            .unwrap()
364    }
365}
366
367impl From<SystemTime> for Timestamp {
368    fn from(time: SystemTime) -> Self {
369        let duration = time.duration_since(UNIX_EPOCH).unwrap();
370        Self {
371            seconds: duration.as_secs(),
372            nanos: duration.subsec_nanos(),
373        }
374    }
375}
376
377impl From<u128> for Nonce {
378    fn from(nonce: u128) -> Self {
379        let upper_half = (nonce >> 64) as u64;
380        let lower_half = nonce as u64;
381        Self {
382            upper_half,
383            lower_half,
384        }
385    }
386}
387
388impl From<Nonce> for u128 {
389    fn from(nonce: Nonce) -> Self {
390        let upper_half = (nonce.upper_half as u128) << 64;
391        let lower_half = nonce.lower_half as u128;
392        upper_half | lower_half
393    }
394}
395
396pub fn split_worktree_update(
397    mut message: UpdateWorktree,
398    max_chunk_size: usize,
399) -> impl Iterator<Item = UpdateWorktree> {
400    let mut done = false;
401    iter::from_fn(move || {
402        if done {
403            return None;
404        }
405
406        let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
407        let updated_entries = message.updated_entries.drain(..chunk_size).collect();
408        done = message.updated_entries.is_empty();
409        Some(UpdateWorktree {
410            project_id: message.project_id,
411            worktree_id: message.worktree_id,
412            root_name: message.root_name.clone(),
413            updated_entries,
414            removed_entries: mem::take(&mut message.removed_entries),
415            scan_id: message.scan_id,
416            is_last_update: done && message.is_last_update,
417        })
418    })
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424
425    #[gpui::test]
426    async fn test_buffer_size() {
427        let (tx, rx) = futures::channel::mpsc::unbounded();
428        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
429        sink.write(Message::Envelope(Envelope {
430            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
431                root_name: "abcdefg".repeat(10),
432                ..Default::default()
433            })),
434            ..Default::default()
435        }))
436        .await
437        .unwrap();
438        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
439        sink.write(Message::Envelope(Envelope {
440            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
441                root_name: "abcdefg".repeat(1000000),
442                ..Default::default()
443            })),
444            ..Default::default()
445        }))
446        .await
447        .unwrap();
448        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
449
450        let mut stream = MessageStream::new(rx.map(|msg| anyhow::Ok(msg)));
451        stream.read().await.unwrap();
452        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
453        stream.read().await.unwrap();
454        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
455    }
456}