proto.rs

  1use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::{anyhow, Result};
  3use async_tungstenite::tungstenite::Message as WebSocketMessage;
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message as _;
  6use serde::Serialize;
  7use std::any::{Any, TypeId};
  8use std::{cmp, iter, mem};
  9use std::{
 10    fmt::Debug,
 11    io,
 12    time::{Duration, SystemTime, UNIX_EPOCH},
 13};
 14
 15include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 16
 17pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 18    const NAME: &'static str;
 19    const PRIORITY: MessagePriority;
 20    fn into_envelope(
 21        self,
 22        id: u32,
 23        responding_to: Option<u32>,
 24        original_sender_id: Option<u32>,
 25    ) -> Envelope;
 26    fn from_envelope(envelope: Envelope) -> Option<Self>;
 27}
 28
 29pub trait EntityMessage: EnvelopedMessage {
 30    fn remote_entity_id(&self) -> u64;
 31}
 32
 33pub trait RequestMessage: EnvelopedMessage {
 34    type Response: EnvelopedMessage;
 35}
 36
 37pub trait AnyTypedEnvelope: 'static + Send + Sync {
 38    fn payload_type_id(&self) -> TypeId;
 39    fn payload_type_name(&self) -> &'static str;
 40    fn as_any(&self) -> &dyn Any;
 41    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 42    fn is_background(&self) -> bool;
 43    fn original_sender_id(&self) -> Option<PeerId>;
 44}
 45
 46pub enum MessagePriority {
 47    Foreground,
 48    Background,
 49}
 50
 51impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 52    fn payload_type_id(&self) -> TypeId {
 53        TypeId::of::<T>()
 54    }
 55
 56    fn payload_type_name(&self) -> &'static str {
 57        T::NAME
 58    }
 59
 60    fn as_any(&self) -> &dyn Any {
 61        self
 62    }
 63
 64    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 65        self
 66    }
 67
 68    fn is_background(&self) -> bool {
 69        matches!(T::PRIORITY, MessagePriority::Background)
 70    }
 71
 72    fn original_sender_id(&self) -> Option<PeerId> {
 73        self.original_sender_id
 74    }
 75}
 76
 77messages!(
 78    (Ack, Foreground),
 79    (AddProjectCollaborator, Foreground),
 80    (ApplyCodeAction, Background),
 81    (ApplyCodeActionResponse, Background),
 82    (ApplyCompletionAdditionalEdits, Background),
 83    (ApplyCompletionAdditionalEditsResponse, Background),
 84    (BufferReloaded, Foreground),
 85    (BufferSaved, Foreground),
 86    (Call, Foreground),
 87    (CancelCall, Foreground),
 88    (ChannelMessageSent, Foreground),
 89    (CopyProjectEntry, Foreground),
 90    (CreateBufferForPeer, Foreground),
 91    (CreateProjectEntry, Foreground),
 92    (CreateRoom, Foreground),
 93    (CreateRoomResponse, Foreground),
 94    (DeclineCall, Foreground),
 95    (DeleteProjectEntry, Foreground),
 96    (Error, Foreground),
 97    (Follow, Foreground),
 98    (FollowResponse, Foreground),
 99    (FormatBuffers, Foreground),
100    (FormatBuffersResponse, Foreground),
101    (FuzzySearchUsers, Foreground),
102    (GetChannelMessages, Foreground),
103    (GetChannelMessagesResponse, Foreground),
104    (GetChannels, Foreground),
105    (GetChannelsResponse, Foreground),
106    (GetCodeActions, Background),
107    (GetCodeActionsResponse, Background),
108    (GetHover, Background),
109    (GetHoverResponse, Background),
110    (GetCompletions, Background),
111    (GetCompletionsResponse, Background),
112    (GetDefinition, Background),
113    (GetDefinitionResponse, Background),
114    (GetTypeDefinition, Background),
115    (GetTypeDefinitionResponse, Background),
116    (GetDocumentHighlights, Background),
117    (GetDocumentHighlightsResponse, Background),
118    (GetReferences, Background),
119    (GetReferencesResponse, Background),
120    (GetProjectSymbols, Background),
121    (GetProjectSymbolsResponse, Background),
122    (GetUsers, Foreground),
123    (IncomingCall, Foreground),
124    (UsersResponse, Foreground),
125    (JoinChannel, Foreground),
126    (JoinChannelResponse, Foreground),
127    (JoinProject, Foreground),
128    (JoinProjectResponse, Foreground),
129    (JoinRoom, Foreground),
130    (JoinRoomResponse, Foreground),
131    (LeaveChannel, Foreground),
132    (LeaveProject, Foreground),
133    (LeaveRoom, Foreground),
134    (OpenBufferById, Background),
135    (OpenBufferByPath, Background),
136    (OpenBufferForSymbol, Background),
137    (OpenBufferForSymbolResponse, Background),
138    (OpenBufferResponse, Background),
139    (PerformRename, Background),
140    (PerformRenameResponse, Background),
141    (PrepareRename, Background),
142    (PrepareRenameResponse, Background),
143    (ProjectEntryResponse, Foreground),
144    (RemoveContact, Foreground),
145    (Ping, Foreground),
146    (RegisterProjectActivity, Foreground),
147    (ReloadBuffers, Foreground),
148    (ReloadBuffersResponse, Foreground),
149    (RemoveProjectCollaborator, Foreground),
150    (RenameProjectEntry, Foreground),
151    (RequestContact, Foreground),
152    (RespondToContactRequest, Foreground),
153    (RoomUpdated, Foreground),
154    (SaveBuffer, Foreground),
155    (SearchProject, Background),
156    (SearchProjectResponse, Background),
157    (SendChannelMessage, Foreground),
158    (SendChannelMessageResponse, Foreground),
159    (ShareProject, Foreground),
160    (ShareProjectResponse, Foreground),
161    (ShowContacts, Foreground),
162    (StartLanguageServer, Foreground),
163    (Test, Foreground),
164    (Unfollow, Foreground),
165    (UnshareProject, Foreground),
166    (UpdateBuffer, Foreground),
167    (UpdateBufferFile, Foreground),
168    (UpdateContacts, Foreground),
169    (UpdateDiagnosticSummary, Foreground),
170    (UpdateFollowers, Foreground),
171    (UpdateInviteInfo, Foreground),
172    (UpdateLanguageServer, Foreground),
173    (UpdateParticipantLocation, Foreground),
174    (UpdateProject, Foreground),
175    (UpdateWorktree, Foreground),
176    (UpdateWorktreeExtensions, Background),
177);
178
179request_messages!(
180    (ApplyCodeAction, ApplyCodeActionResponse),
181    (
182        ApplyCompletionAdditionalEdits,
183        ApplyCompletionAdditionalEditsResponse
184    ),
185    (Call, Ack),
186    (CopyProjectEntry, ProjectEntryResponse),
187    (CreateProjectEntry, ProjectEntryResponse),
188    (CreateRoom, CreateRoomResponse),
189    (DeclineCall, Ack),
190    (DeleteProjectEntry, ProjectEntryResponse),
191    (Follow, FollowResponse),
192    (FormatBuffers, FormatBuffersResponse),
193    (GetChannelMessages, GetChannelMessagesResponse),
194    (GetChannels, GetChannelsResponse),
195    (GetCodeActions, GetCodeActionsResponse),
196    (GetHover, GetHoverResponse),
197    (GetCompletions, GetCompletionsResponse),
198    (GetDefinition, GetDefinitionResponse),
199    (GetTypeDefinition, GetTypeDefinitionResponse),
200    (GetDocumentHighlights, GetDocumentHighlightsResponse),
201    (GetReferences, GetReferencesResponse),
202    (GetProjectSymbols, GetProjectSymbolsResponse),
203    (FuzzySearchUsers, UsersResponse),
204    (GetUsers, UsersResponse),
205    (JoinChannel, JoinChannelResponse),
206    (JoinProject, JoinProjectResponse),
207    (JoinRoom, JoinRoomResponse),
208    (IncomingCall, Ack),
209    (OpenBufferById, OpenBufferResponse),
210    (OpenBufferByPath, OpenBufferResponse),
211    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
212    (Ping, Ack),
213    (PerformRename, PerformRenameResponse),
214    (PrepareRename, PrepareRenameResponse),
215    (ReloadBuffers, ReloadBuffersResponse),
216    (RequestContact, Ack),
217    (RemoveContact, Ack),
218    (RespondToContactRequest, Ack),
219    (RenameProjectEntry, ProjectEntryResponse),
220    (SaveBuffer, BufferSaved),
221    (SearchProject, SearchProjectResponse),
222    (SendChannelMessage, SendChannelMessageResponse),
223    (ShareProject, ShareProjectResponse),
224    (Test, Test),
225    (UpdateBuffer, Ack),
226    (UpdateParticipantLocation, Ack),
227    (UpdateWorktree, Ack),
228);
229
230entity_messages!(
231    project_id,
232    AddProjectCollaborator,
233    ApplyCodeAction,
234    ApplyCompletionAdditionalEdits,
235    BufferReloaded,
236    BufferSaved,
237    CopyProjectEntry,
238    CreateBufferForPeer,
239    CreateProjectEntry,
240    DeleteProjectEntry,
241    Follow,
242    FormatBuffers,
243    GetCodeActions,
244    GetCompletions,
245    GetDefinition,
246    GetTypeDefinition,
247    GetDocumentHighlights,
248    GetHover,
249    GetReferences,
250    GetProjectSymbols,
251    JoinProject,
252    LeaveProject,
253    OpenBufferById,
254    OpenBufferByPath,
255    OpenBufferForSymbol,
256    PerformRename,
257    PrepareRename,
258    RegisterProjectActivity,
259    ReloadBuffers,
260    RemoveProjectCollaborator,
261    RenameProjectEntry,
262    SaveBuffer,
263    SearchProject,
264    StartLanguageServer,
265    Unfollow,
266    UnshareProject,
267    UpdateBuffer,
268    UpdateBufferFile,
269    UpdateDiagnosticSummary,
270    UpdateFollowers,
271    UpdateLanguageServer,
272    UpdateProject,
273    UpdateWorktree,
274    UpdateWorktreeExtensions,
275);
276
277entity_messages!(channel_id, ChannelMessageSent);
278
279const KIB: usize = 1024;
280const MIB: usize = KIB * 1024;
281const MAX_BUFFER_LEN: usize = MIB;
282
283/// A stream of protobuf messages.
284pub struct MessageStream<S> {
285    stream: S,
286    encoding_buffer: Vec<u8>,
287}
288
289#[allow(clippy::large_enum_variant)]
290#[derive(Debug)]
291pub enum Message {
292    Envelope(Envelope),
293    Ping,
294    Pong,
295}
296
297impl<S> MessageStream<S> {
298    pub fn new(stream: S) -> Self {
299        Self {
300            stream,
301            encoding_buffer: Vec::new(),
302        }
303    }
304
305    pub fn inner_mut(&mut self) -> &mut S {
306        &mut self.stream
307    }
308}
309
310impl<S> MessageStream<S>
311where
312    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
313{
314    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
315        #[cfg(any(test, feature = "test-support"))]
316        const COMPRESSION_LEVEL: i32 = -7;
317
318        #[cfg(not(any(test, feature = "test-support")))]
319        const COMPRESSION_LEVEL: i32 = 4;
320
321        match message {
322            Message::Envelope(message) => {
323                self.encoding_buffer.reserve(message.encoded_len());
324                message
325                    .encode(&mut self.encoding_buffer)
326                    .map_err(io::Error::from)?;
327                let buffer =
328                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
329                        .unwrap();
330
331                self.encoding_buffer.clear();
332                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
333                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
334            }
335            Message::Ping => {
336                self.stream
337                    .send(WebSocketMessage::Ping(Default::default()))
338                    .await?;
339            }
340            Message::Pong => {
341                self.stream
342                    .send(WebSocketMessage::Pong(Default::default()))
343                    .await?;
344            }
345        }
346
347        Ok(())
348    }
349}
350
351impl<S> MessageStream<S>
352where
353    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
354{
355    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
356        while let Some(bytes) = self.stream.next().await {
357            match bytes? {
358                WebSocketMessage::Binary(bytes) => {
359                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
360                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
361                        .map_err(io::Error::from)?;
362
363                    self.encoding_buffer.clear();
364                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
365                    return Ok(Message::Envelope(envelope));
366                }
367                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
368                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
369                WebSocketMessage::Close(_) => break,
370                _ => {}
371            }
372        }
373        Err(anyhow!("connection closed"))
374    }
375}
376
377impl From<Timestamp> for SystemTime {
378    fn from(val: Timestamp) -> Self {
379        UNIX_EPOCH
380            .checked_add(Duration::new(val.seconds, val.nanos))
381            .unwrap()
382    }
383}
384
385impl From<SystemTime> for Timestamp {
386    fn from(time: SystemTime) -> Self {
387        let duration = time.duration_since(UNIX_EPOCH).unwrap();
388        Self {
389            seconds: duration.as_secs(),
390            nanos: duration.subsec_nanos(),
391        }
392    }
393}
394
395impl From<u128> for Nonce {
396    fn from(nonce: u128) -> Self {
397        let upper_half = (nonce >> 64) as u64;
398        let lower_half = nonce as u64;
399        Self {
400            upper_half,
401            lower_half,
402        }
403    }
404}
405
406impl From<Nonce> for u128 {
407    fn from(nonce: Nonce) -> Self {
408        let upper_half = (nonce.upper_half as u128) << 64;
409        let lower_half = nonce.lower_half as u128;
410        upper_half | lower_half
411    }
412}
413
414pub fn split_worktree_update(
415    mut message: UpdateWorktree,
416    max_chunk_size: usize,
417) -> impl Iterator<Item = UpdateWorktree> {
418    let mut done = false;
419    iter::from_fn(move || {
420        if done {
421            return None;
422        }
423
424        let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
425        let updated_entries = message.updated_entries.drain(..chunk_size).collect();
426        done = message.updated_entries.is_empty();
427        Some(UpdateWorktree {
428            project_id: message.project_id,
429            worktree_id: message.worktree_id,
430            root_name: message.root_name.clone(),
431            updated_entries,
432            removed_entries: mem::take(&mut message.removed_entries),
433            scan_id: message.scan_id,
434            is_last_update: done && message.is_last_update,
435        })
436    })
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[gpui::test]
444    async fn test_buffer_size() {
445        let (tx, rx) = futures::channel::mpsc::unbounded();
446        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
447        sink.write(Message::Envelope(Envelope {
448            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
449                root_name: "abcdefg".repeat(10),
450                ..Default::default()
451            })),
452            ..Default::default()
453        }))
454        .await
455        .unwrap();
456        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
457        sink.write(Message::Envelope(Envelope {
458            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
459                root_name: "abcdefg".repeat(1000000),
460                ..Default::default()
461            })),
462            ..Default::default()
463        }))
464        .await
465        .unwrap();
466        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
467
468        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
469        stream.read().await.unwrap();
470        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
471        stream.read().await.unwrap();
472        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
473    }
474}