proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::{
 12    cmp,
 13    fmt::Debug,
 14    io, iter,
 15    time::{Duration, SystemTime, UNIX_EPOCH},
 16};
 17use std::{fmt, mem};
 18
 19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 20
 21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 22    const NAME: &'static str;
 23    const PRIORITY: MessagePriority;
 24    fn into_envelope(
 25        self,
 26        id: u32,
 27        responding_to: Option<u32>,
 28        original_sender_id: Option<PeerId>,
 29    ) -> Envelope;
 30    fn from_envelope(envelope: Envelope) -> Option<Self>;
 31}
 32
 33pub trait EntityMessage: EnvelopedMessage {
 34    fn remote_entity_id(&self) -> u64;
 35}
 36
 37pub trait RequestMessage: EnvelopedMessage {
 38    type Response: EnvelopedMessage;
 39}
 40
 41pub trait AnyTypedEnvelope: 'static + Send + Sync {
 42    fn payload_type_id(&self) -> TypeId;
 43    fn payload_type_name(&self) -> &'static str;
 44    fn as_any(&self) -> &dyn Any;
 45    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 46    fn is_background(&self) -> bool;
 47    fn original_sender_id(&self) -> Option<PeerId>;
 48    fn sender_id(&self) -> ConnectionId;
 49    fn message_id(&self) -> u32;
 50}
 51
 52pub enum MessagePriority {
 53    Foreground,
 54    Background,
 55}
 56
 57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 58    fn payload_type_id(&self) -> TypeId {
 59        TypeId::of::<T>()
 60    }
 61
 62    fn payload_type_name(&self) -> &'static str {
 63        T::NAME
 64    }
 65
 66    fn as_any(&self) -> &dyn Any {
 67        self
 68    }
 69
 70    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 71        self
 72    }
 73
 74    fn is_background(&self) -> bool {
 75        matches!(T::PRIORITY, MessagePriority::Background)
 76    }
 77
 78    fn original_sender_id(&self) -> Option<PeerId> {
 79        self.original_sender_id
 80    }
 81
 82    fn sender_id(&self) -> ConnectionId {
 83        self.sender_id
 84    }
 85
 86    fn message_id(&self) -> u32 {
 87        self.message_id
 88    }
 89}
 90
 91impl PeerId {
 92    pub fn from_u64(peer_id: u64) -> Self {
 93        let owner_id = (peer_id >> 32) as u32;
 94        let id = peer_id as u32;
 95        Self { owner_id, id }
 96    }
 97
 98    pub fn as_u64(self) -> u64 {
 99        ((self.owner_id as u64) << 32) | (self.id as u64)
100    }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108    fn cmp(&self, other: &Self) -> cmp::Ordering {
109        self.owner_id
110            .cmp(&other.owner_id)
111            .then_with(|| self.id.cmp(&other.id))
112    }
113}
114
115impl PartialOrd for PeerId {
116    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117        Some(self.cmp(other))
118    }
119}
120
121impl std::hash::Hash for PeerId {
122    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123        self.owner_id.hash(state);
124        self.id.hash(state);
125    }
126}
127
128impl fmt::Display for PeerId {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        write!(f, "{}/{}", self.owner_id, self.id)
131    }
132}
133
134messages!(
135    (Ack, Foreground),
136    (AddProjectCollaborator, Foreground),
137    (ApplyCodeAction, Background),
138    (ApplyCodeActionResponse, Background),
139    (ApplyCompletionAdditionalEdits, Background),
140    (ApplyCompletionAdditionalEditsResponse, Background),
141    (BufferReloaded, Foreground),
142    (BufferSaved, Foreground),
143    (Call, Foreground),
144    (CallCanceled, Foreground),
145    (CancelCall, Foreground),
146    (CopyProjectEntry, Foreground),
147    (CreateBufferForPeer, Foreground),
148    (CreateChannel, Foreground),
149    (ChannelResponse, Foreground),
150    (CreateProjectEntry, Foreground),
151    (CreateRoom, Foreground),
152    (CreateRoomResponse, Foreground),
153    (DeclineCall, Foreground),
154    (DeleteProjectEntry, Foreground),
155    (Error, Foreground),
156    (ExpandProjectEntry, Foreground),
157    (Follow, Foreground),
158    (FollowResponse, Foreground),
159    (FormatBuffers, Foreground),
160    (FormatBuffersResponse, Foreground),
161    (FuzzySearchUsers, Foreground),
162    (GetCodeActions, Background),
163    (GetCodeActionsResponse, Background),
164    (GetHover, Background),
165    (GetHoverResponse, Background),
166    (GetCompletions, Background),
167    (GetCompletionsResponse, Background),
168    (GetDefinition, Background),
169    (GetDefinitionResponse, Background),
170    (GetTypeDefinition, Background),
171    (GetTypeDefinitionResponse, Background),
172    (GetDocumentHighlights, Background),
173    (GetDocumentHighlightsResponse, Background),
174    (GetReferences, Background),
175    (GetReferencesResponse, Background),
176    (GetProjectSymbols, Background),
177    (GetProjectSymbolsResponse, Background),
178    (GetUsers, Foreground),
179    (Hello, Foreground),
180    (IncomingCall, Foreground),
181    (InviteChannelMember, Foreground),
182    (UsersResponse, Foreground),
183    (JoinProject, Foreground),
184    (JoinProjectResponse, Foreground),
185    (JoinRoom, Foreground),
186    (JoinRoomResponse, Foreground),
187    (LeaveProject, Foreground),
188    (LeaveRoom, Foreground),
189    (OpenBufferById, Background),
190    (OpenBufferByPath, Background),
191    (OpenBufferForSymbol, Background),
192    (OpenBufferForSymbolResponse, Background),
193    (OpenBufferResponse, Background),
194    (PerformRename, Background),
195    (PerformRenameResponse, Background),
196    (OnTypeFormatting, Background),
197    (OnTypeFormattingResponse, Background),
198    (InlayHints, Background),
199    (InlayHintsResponse, Background),
200    (RefreshInlayHints, Foreground),
201    (Ping, Foreground),
202    (PrepareRename, Background),
203    (PrepareRenameResponse, Background),
204    (ExpandProjectEntryResponse, Foreground),
205    (ProjectEntryResponse, Foreground),
206    (RejoinRoom, Foreground),
207    (RejoinRoomResponse, Foreground),
208    (RemoveContact, Foreground),
209    (RemoveChannelMember, Foreground),
210    (ReloadBuffers, Foreground),
211    (ReloadBuffersResponse, Foreground),
212    (RemoveProjectCollaborator, Foreground),
213    (RenameProjectEntry, Foreground),
214    (RequestContact, Foreground),
215    (RespondToContactRequest, Foreground),
216    (RespondToChannelInvite, Foreground),
217    (JoinChannel, Foreground),
218    (RoomUpdated, Foreground),
219    (SaveBuffer, Foreground),
220    (RenameChannel, Foreground),
221    (SetChannelMemberAdmin, Foreground),
222    (SearchProject, Background),
223    (SearchProjectResponse, Background),
224    (ShareProject, Foreground),
225    (ShareProjectResponse, Foreground),
226    (ShowContacts, Foreground),
227    (StartLanguageServer, Foreground),
228    (SynchronizeBuffers, Foreground),
229    (SynchronizeBuffersResponse, Foreground),
230    (Test, Foreground),
231    (Unfollow, Foreground),
232    (UnshareProject, Foreground),
233    (UpdateBuffer, Foreground),
234    (UpdateBufferFile, Foreground),
235    (UpdateContacts, Foreground),
236    (RemoveChannel, Foreground),
237    (UpdateChannels, Foreground),
238    (UpdateDiagnosticSummary, Foreground),
239    (UpdateFollowers, Foreground),
240    (UpdateInviteInfo, Foreground),
241    (UpdateLanguageServer, Foreground),
242    (UpdateParticipantLocation, Foreground),
243    (UpdateProject, Foreground),
244    (UpdateProjectCollaborator, Foreground),
245    (UpdateWorktree, Foreground),
246    (UpdateWorktreeSettings, Foreground),
247    (UpdateDiffBase, Foreground),
248    (GetPrivateUserInfo, Foreground),
249    (GetPrivateUserInfoResponse, Foreground),
250    (GetChannelMembers, Foreground),
251    (GetChannelMembersResponse, Foreground),
252    (JoinChannelBuffer, Foreground),
253    (JoinChannelBufferResponse, Foreground),
254    (LeaveChannelBuffer, Background),
255    (UpdateChannelBuffer, Foreground),
256    (RemoveChannelBufferCollaborator, Foreground),
257    (AddChannelBufferCollaborator, Foreground),
258);
259
260request_messages!(
261    (ApplyCodeAction, ApplyCodeActionResponse),
262    (
263        ApplyCompletionAdditionalEdits,
264        ApplyCompletionAdditionalEditsResponse
265    ),
266    (Call, Ack),
267    (CancelCall, Ack),
268    (CopyProjectEntry, ProjectEntryResponse),
269    (CreateProjectEntry, ProjectEntryResponse),
270    (CreateRoom, CreateRoomResponse),
271    (CreateChannel, ChannelResponse),
272    (DeclineCall, Ack),
273    (DeleteProjectEntry, ProjectEntryResponse),
274    (ExpandProjectEntry, ExpandProjectEntryResponse),
275    (Follow, FollowResponse),
276    (FormatBuffers, FormatBuffersResponse),
277    (GetCodeActions, GetCodeActionsResponse),
278    (GetHover, GetHoverResponse),
279    (GetCompletions, GetCompletionsResponse),
280    (GetDefinition, GetDefinitionResponse),
281    (GetTypeDefinition, GetTypeDefinitionResponse),
282    (GetDocumentHighlights, GetDocumentHighlightsResponse),
283    (GetReferences, GetReferencesResponse),
284    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
285    (GetProjectSymbols, GetProjectSymbolsResponse),
286    (FuzzySearchUsers, UsersResponse),
287    (GetUsers, UsersResponse),
288    (InviteChannelMember, Ack),
289    (JoinProject, JoinProjectResponse),
290    (JoinRoom, JoinRoomResponse),
291    (LeaveRoom, Ack),
292    (RejoinRoom, RejoinRoomResponse),
293    (IncomingCall, Ack),
294    (OpenBufferById, OpenBufferResponse),
295    (OpenBufferByPath, OpenBufferResponse),
296    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
297    (Ping, Ack),
298    (PerformRename, PerformRenameResponse),
299    (PrepareRename, PrepareRenameResponse),
300    (OnTypeFormatting, OnTypeFormattingResponse),
301    (InlayHints, InlayHintsResponse),
302    (RefreshInlayHints, Ack),
303    (ReloadBuffers, ReloadBuffersResponse),
304    (RequestContact, Ack),
305    (RemoveChannelMember, Ack),
306    (RemoveContact, Ack),
307    (RespondToContactRequest, Ack),
308    (RespondToChannelInvite, Ack),
309    (SetChannelMemberAdmin, Ack),
310    (GetChannelMembers, GetChannelMembersResponse),
311    (JoinChannel, JoinRoomResponse),
312    (RemoveChannel, Ack),
313    (RenameProjectEntry, ProjectEntryResponse),
314    (RenameChannel, ChannelResponse),
315    (SaveBuffer, BufferSaved),
316    (SearchProject, SearchProjectResponse),
317    (ShareProject, ShareProjectResponse),
318    (SynchronizeBuffers, SynchronizeBuffersResponse),
319    (Test, Test),
320    (UpdateBuffer, Ack),
321    (UpdateParticipantLocation, Ack),
322    (UpdateProject, Ack),
323    (UpdateWorktree, Ack),
324    (JoinChannelBuffer, JoinChannelBufferResponse),
325    (LeaveChannelBuffer, Ack)
326);
327
328entity_messages!(
329    project_id,
330    AddProjectCollaborator,
331    ApplyCodeAction,
332    ApplyCompletionAdditionalEdits,
333    BufferReloaded,
334    BufferSaved,
335    CopyProjectEntry,
336    CreateBufferForPeer,
337    CreateProjectEntry,
338    DeleteProjectEntry,
339    ExpandProjectEntry,
340    Follow,
341    FormatBuffers,
342    GetCodeActions,
343    GetCompletions,
344    GetDefinition,
345    GetTypeDefinition,
346    GetDocumentHighlights,
347    GetHover,
348    GetReferences,
349    GetProjectSymbols,
350    JoinProject,
351    LeaveProject,
352    OpenBufferById,
353    OpenBufferByPath,
354    OpenBufferForSymbol,
355    PerformRename,
356    OnTypeFormatting,
357    InlayHints,
358    RefreshInlayHints,
359    PrepareRename,
360    ReloadBuffers,
361    RemoveProjectCollaborator,
362    RenameProjectEntry,
363    SaveBuffer,
364    SearchProject,
365    StartLanguageServer,
366    SynchronizeBuffers,
367    Unfollow,
368    UnshareProject,
369    UpdateBuffer,
370    UpdateBufferFile,
371    UpdateDiagnosticSummary,
372    UpdateFollowers,
373    UpdateLanguageServer,
374    UpdateProject,
375    UpdateProjectCollaborator,
376    UpdateWorktree,
377    UpdateWorktreeSettings,
378    UpdateDiffBase
379);
380
381entity_messages!(
382    channel_id,
383    UpdateChannelBuffer,
384    RemoveChannelBufferCollaborator,
385    AddChannelBufferCollaborator
386);
387
388const KIB: usize = 1024;
389const MIB: usize = KIB * 1024;
390const MAX_BUFFER_LEN: usize = MIB;
391
392/// A stream of protobuf messages.
393pub struct MessageStream<S> {
394    stream: S,
395    encoding_buffer: Vec<u8>,
396}
397
398#[allow(clippy::large_enum_variant)]
399#[derive(Debug)]
400pub enum Message {
401    Envelope(Envelope),
402    Ping,
403    Pong,
404}
405
406impl<S> MessageStream<S> {
407    pub fn new(stream: S) -> Self {
408        Self {
409            stream,
410            encoding_buffer: Vec::new(),
411        }
412    }
413
414    pub fn inner_mut(&mut self) -> &mut S {
415        &mut self.stream
416    }
417}
418
419impl<S> MessageStream<S>
420where
421    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
422{
423    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
424        #[cfg(any(test, feature = "test-support"))]
425        const COMPRESSION_LEVEL: i32 = -7;
426
427        #[cfg(not(any(test, feature = "test-support")))]
428        const COMPRESSION_LEVEL: i32 = 4;
429
430        match message {
431            Message::Envelope(message) => {
432                self.encoding_buffer.reserve(message.encoded_len());
433                message
434                    .encode(&mut self.encoding_buffer)
435                    .map_err(io::Error::from)?;
436                let buffer =
437                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
438                        .unwrap();
439
440                self.encoding_buffer.clear();
441                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
442                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
443            }
444            Message::Ping => {
445                self.stream
446                    .send(WebSocketMessage::Ping(Default::default()))
447                    .await?;
448            }
449            Message::Pong => {
450                self.stream
451                    .send(WebSocketMessage::Pong(Default::default()))
452                    .await?;
453            }
454        }
455
456        Ok(())
457    }
458}
459
460impl<S> MessageStream<S>
461where
462    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
463{
464    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
465        while let Some(bytes) = self.stream.next().await {
466            match bytes? {
467                WebSocketMessage::Binary(bytes) => {
468                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
469                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
470                        .map_err(io::Error::from)?;
471
472                    self.encoding_buffer.clear();
473                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
474                    return Ok(Message::Envelope(envelope));
475                }
476                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
477                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
478                WebSocketMessage::Close(_) => break,
479                _ => {}
480            }
481        }
482        Err(anyhow!("connection closed"))
483    }
484}
485
486impl From<Timestamp> for SystemTime {
487    fn from(val: Timestamp) -> Self {
488        UNIX_EPOCH
489            .checked_add(Duration::new(val.seconds, val.nanos))
490            .unwrap()
491    }
492}
493
494impl From<SystemTime> for Timestamp {
495    fn from(time: SystemTime) -> Self {
496        let duration = time.duration_since(UNIX_EPOCH).unwrap();
497        Self {
498            seconds: duration.as_secs(),
499            nanos: duration.subsec_nanos(),
500        }
501    }
502}
503
504impl From<u128> for Nonce {
505    fn from(nonce: u128) -> Self {
506        let upper_half = (nonce >> 64) as u64;
507        let lower_half = nonce as u64;
508        Self {
509            upper_half,
510            lower_half,
511        }
512    }
513}
514
515impl From<Nonce> for u128 {
516    fn from(nonce: Nonce) -> Self {
517        let upper_half = (nonce.upper_half as u128) << 64;
518        let lower_half = nonce.lower_half as u128;
519        upper_half | lower_half
520    }
521}
522
523pub fn split_worktree_update(
524    mut message: UpdateWorktree,
525    max_chunk_size: usize,
526) -> impl Iterator<Item = UpdateWorktree> {
527    let mut done_files = false;
528
529    let mut repository_map = message
530        .updated_repositories
531        .into_iter()
532        .map(|repo| (repo.work_directory_id, repo))
533        .collect::<HashMap<_, _>>();
534
535    iter::from_fn(move || {
536        if done_files {
537            return None;
538        }
539
540        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
541        let updated_entries: Vec<_> = message
542            .updated_entries
543            .drain(..updated_entries_chunk_size)
544            .collect();
545
546        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
547        let removed_entries = message
548            .removed_entries
549            .drain(..removed_entries_chunk_size)
550            .collect();
551
552        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
553
554        let mut updated_repositories = Vec::new();
555
556        if !repository_map.is_empty() {
557            for entry in &updated_entries {
558                if let Some(repo) = repository_map.remove(&entry.id) {
559                    updated_repositories.push(repo)
560                }
561            }
562        }
563
564        let removed_repositories = if done_files {
565            mem::take(&mut message.removed_repositories)
566        } else {
567            Default::default()
568        };
569
570        if done_files {
571            updated_repositories.extend(mem::take(&mut repository_map).into_values());
572        }
573
574        Some(UpdateWorktree {
575            project_id: message.project_id,
576            worktree_id: message.worktree_id,
577            root_name: message.root_name.clone(),
578            abs_path: message.abs_path.clone(),
579            updated_entries,
580            removed_entries,
581            scan_id: message.scan_id,
582            is_last_update: done_files && message.is_last_update,
583            updated_repositories,
584            removed_repositories,
585        })
586    })
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    #[gpui::test]
594    async fn test_buffer_size() {
595        let (tx, rx) = futures::channel::mpsc::unbounded();
596        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
597        sink.write(Message::Envelope(Envelope {
598            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
599                root_name: "abcdefg".repeat(10),
600                ..Default::default()
601            })),
602            ..Default::default()
603        }))
604        .await
605        .unwrap();
606        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
607        sink.write(Message::Envelope(Envelope {
608            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
609                root_name: "abcdefg".repeat(1000000),
610                ..Default::default()
611            })),
612            ..Default::default()
613        }))
614        .await
615        .unwrap();
616        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
617
618        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
619        stream.read().await.unwrap();
620        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
621        stream.read().await.unwrap();
622        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
623    }
624
625    #[gpui::test]
626    fn test_converting_peer_id_from_and_to_u64() {
627        let peer_id = PeerId {
628            owner_id: 10,
629            id: 3,
630        };
631        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
632        let peer_id = PeerId {
633            owner_id: u32::MAX,
634            id: 3,
635        };
636        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
637        let peer_id = PeerId {
638            owner_id: 10,
639            id: u32::MAX,
640        };
641        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
642        let peer_id = PeerId {
643            owner_id: u32::MAX,
644            id: u32::MAX,
645        };
646        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
647    }
648}