proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::{
 12    cmp,
 13    fmt::Debug,
 14    io, iter,
 15    time::{Duration, SystemTime, UNIX_EPOCH},
 16};
 17use std::{fmt, mem};
 18
 19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 20
 21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 22    const NAME: &'static str;
 23    const PRIORITY: MessagePriority;
 24    fn into_envelope(
 25        self,
 26        id: u32,
 27        responding_to: Option<u32>,
 28        original_sender_id: Option<PeerId>,
 29    ) -> Envelope;
 30    fn from_envelope(envelope: Envelope) -> Option<Self>;
 31}
 32
 33pub trait EntityMessage: EnvelopedMessage {
 34    fn remote_entity_id(&self) -> u64;
 35}
 36
 37pub trait RequestMessage: EnvelopedMessage {
 38    type Response: EnvelopedMessage;
 39}
 40
 41pub trait AnyTypedEnvelope: 'static + Send + Sync {
 42    fn payload_type_id(&self) -> TypeId;
 43    fn payload_type_name(&self) -> &'static str;
 44    fn as_any(&self) -> &dyn Any;
 45    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 46    fn is_background(&self) -> bool;
 47    fn original_sender_id(&self) -> Option<PeerId>;
 48    fn sender_id(&self) -> ConnectionId;
 49    fn message_id(&self) -> u32;
 50}
 51
 52pub enum MessagePriority {
 53    Foreground,
 54    Background,
 55}
 56
 57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 58    fn payload_type_id(&self) -> TypeId {
 59        TypeId::of::<T>()
 60    }
 61
 62    fn payload_type_name(&self) -> &'static str {
 63        T::NAME
 64    }
 65
 66    fn as_any(&self) -> &dyn Any {
 67        self
 68    }
 69
 70    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 71        self
 72    }
 73
 74    fn is_background(&self) -> bool {
 75        matches!(T::PRIORITY, MessagePriority::Background)
 76    }
 77
 78    fn original_sender_id(&self) -> Option<PeerId> {
 79        self.original_sender_id
 80    }
 81
 82    fn sender_id(&self) -> ConnectionId {
 83        self.sender_id
 84    }
 85
 86    fn message_id(&self) -> u32 {
 87        self.message_id
 88    }
 89}
 90
 91impl PeerId {
 92    pub fn from_u64(peer_id: u64) -> Self {
 93        let owner_id = (peer_id >> 32) as u32;
 94        let id = peer_id as u32;
 95        Self { owner_id, id }
 96    }
 97
 98    pub fn as_u64(self) -> u64 {
 99        ((self.owner_id as u64) << 32) | (self.id as u64)
100    }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108    fn cmp(&self, other: &Self) -> cmp::Ordering {
109        self.owner_id
110            .cmp(&other.owner_id)
111            .then_with(|| self.id.cmp(&other.id))
112    }
113}
114
115impl PartialOrd for PeerId {
116    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117        Some(self.cmp(other))
118    }
119}
120
121impl std::hash::Hash for PeerId {
122    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123        self.owner_id.hash(state);
124        self.id.hash(state);
125    }
126}
127
128impl fmt::Display for PeerId {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        write!(f, "{}/{}", self.owner_id, self.id)
131    }
132}
133
134messages!(
135    (Ack, Foreground),
136    (AddProjectCollaborator, Foreground),
137    (ApplyCodeAction, Background),
138    (ApplyCodeActionResponse, Background),
139    (ApplyCompletionAdditionalEdits, Background),
140    (ApplyCompletionAdditionalEditsResponse, Background),
141    (BufferReloaded, Foreground),
142    (BufferSaved, Foreground),
143    (Call, Foreground),
144    (CallCanceled, Foreground),
145    (CancelCall, Foreground),
146    (CopyProjectEntry, Foreground),
147    (CreateBufferForPeer, Foreground),
148    (CreateChannel, Foreground),
149    (ChannelResponse, Foreground),
150    (ChannelMessageSent, Foreground),
151    (CreateProjectEntry, Foreground),
152    (CreateRoom, Foreground),
153    (CreateRoomResponse, Foreground),
154    (DeclineCall, Foreground),
155    (DeleteProjectEntry, Foreground),
156    (Error, Foreground),
157    (ExpandProjectEntry, Foreground),
158    (Follow, Foreground),
159    (FollowResponse, Foreground),
160    (FormatBuffers, Foreground),
161    (FormatBuffersResponse, Foreground),
162    (FuzzySearchUsers, Foreground),
163    (GetCodeActions, Background),
164    (GetCodeActionsResponse, Background),
165    (GetHover, Background),
166    (GetHoverResponse, Background),
167    (GetChannelMessages, Background),
168    (GetChannelMessagesResponse, Background),
169    (SendChannelMessage, Background),
170    (SendChannelMessageResponse, Background),
171    (GetCompletions, Background),
172    (GetCompletionsResponse, Background),
173    (GetDefinition, Background),
174    (GetDefinitionResponse, Background),
175    (GetTypeDefinition, Background),
176    (GetTypeDefinitionResponse, Background),
177    (GetDocumentHighlights, Background),
178    (GetDocumentHighlightsResponse, Background),
179    (GetReferences, Background),
180    (GetReferencesResponse, Background),
181    (GetProjectSymbols, Background),
182    (GetProjectSymbolsResponse, Background),
183    (GetUsers, Foreground),
184    (Hello, Foreground),
185    (IncomingCall, Foreground),
186    (InviteChannelMember, Foreground),
187    (UsersResponse, Foreground),
188    (JoinProject, Foreground),
189    (JoinProjectResponse, Foreground),
190    (JoinRoom, Foreground),
191    (JoinRoomResponse, Foreground),
192    (JoinChannelChat, Foreground),
193    (JoinChannelChatResponse, Foreground),
194    (LeaveChannelChat, Foreground),
195    (LeaveProject, Foreground),
196    (LeaveRoom, Foreground),
197    (OpenBufferById, Background),
198    (OpenBufferByPath, Background),
199    (OpenBufferForSymbol, Background),
200    (OpenBufferForSymbolResponse, Background),
201    (OpenBufferResponse, Background),
202    (PerformRename, Background),
203    (PerformRenameResponse, Background),
204    (OnTypeFormatting, Background),
205    (OnTypeFormattingResponse, Background),
206    (InlayHints, Background),
207    (InlayHintsResponse, Background),
208    (ResolveInlayHint, Background),
209    (ResolveInlayHintResponse, Background),
210    (RefreshInlayHints, Foreground),
211    (Ping, Foreground),
212    (PrepareRename, Background),
213    (PrepareRenameResponse, Background),
214    (ExpandProjectEntryResponse, Foreground),
215    (ProjectEntryResponse, Foreground),
216    (RejoinRoom, Foreground),
217    (RejoinRoomResponse, Foreground),
218    (RemoveContact, Foreground),
219    (RemoveChannelMember, Foreground),
220    (RemoveChannelMessage, Foreground),
221    (ReloadBuffers, Foreground),
222    (ReloadBuffersResponse, Foreground),
223    (RemoveProjectCollaborator, Foreground),
224    (RenameProjectEntry, Foreground),
225    (RequestContact, Foreground),
226    (RespondToContactRequest, Foreground),
227    (RespondToChannelInvite, Foreground),
228    (JoinChannel, Foreground),
229    (RoomUpdated, Foreground),
230    (SaveBuffer, Foreground),
231    (RenameChannel, Foreground),
232    (SetChannelMemberAdmin, Foreground),
233    (SearchProject, Background),
234    (SearchProjectResponse, Background),
235    (ShareProject, Foreground),
236    (ShareProjectResponse, Foreground),
237    (ShowContacts, Foreground),
238    (StartLanguageServer, Foreground),
239    (SynchronizeBuffers, Foreground),
240    (SynchronizeBuffersResponse, Foreground),
241    (RejoinChannelBuffers, Foreground),
242    (RejoinChannelBuffersResponse, Foreground),
243    (Test, Foreground),
244    (Unfollow, Foreground),
245    (UnshareProject, Foreground),
246    (UpdateBuffer, Foreground),
247    (UpdateBufferFile, Foreground),
248    (UpdateContacts, Foreground),
249    (DeleteChannel, Foreground),
250    (MoveChannel, Foreground),
251    (UpdateChannels, Foreground),
252    (UpdateDiagnosticSummary, Foreground),
253    (UpdateFollowers, Foreground),
254    (UpdateInviteInfo, Foreground),
255    (UpdateLanguageServer, Foreground),
256    (UpdateParticipantLocation, Foreground),
257    (UpdateProject, Foreground),
258    (UpdateProjectCollaborator, Foreground),
259    (UpdateWorktree, Foreground),
260    (UpdateWorktreeSettings, Foreground),
261    (UpdateDiffBase, Foreground),
262    (GetPrivateUserInfo, Foreground),
263    (GetPrivateUserInfoResponse, Foreground),
264    (GetChannelMembers, Foreground),
265    (GetChannelMembersResponse, Foreground),
266    (JoinChannelBuffer, Foreground),
267    (JoinChannelBufferResponse, Foreground),
268    (LeaveChannelBuffer, Background),
269    (UpdateChannelBuffer, Foreground),
270    (RemoveChannelBufferCollaborator, Foreground),
271    (AddChannelBufferCollaborator, Foreground),
272    (UpdateChannelBufferCollaborator, Foreground),
273);
274
275request_messages!(
276    (ApplyCodeAction, ApplyCodeActionResponse),
277    (
278        ApplyCompletionAdditionalEdits,
279        ApplyCompletionAdditionalEditsResponse
280    ),
281    (Call, Ack),
282    (CancelCall, Ack),
283    (CopyProjectEntry, ProjectEntryResponse),
284    (CreateProjectEntry, ProjectEntryResponse),
285    (CreateRoom, CreateRoomResponse),
286    (CreateChannel, ChannelResponse),
287    (DeclineCall, Ack),
288    (DeleteProjectEntry, ProjectEntryResponse),
289    (ExpandProjectEntry, ExpandProjectEntryResponse),
290    (Follow, FollowResponse),
291    (FormatBuffers, FormatBuffersResponse),
292    (GetCodeActions, GetCodeActionsResponse),
293    (GetHover, GetHoverResponse),
294    (GetCompletions, GetCompletionsResponse),
295    (GetDefinition, GetDefinitionResponse),
296    (GetTypeDefinition, GetTypeDefinitionResponse),
297    (GetDocumentHighlights, GetDocumentHighlightsResponse),
298    (GetReferences, GetReferencesResponse),
299    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
300    (GetProjectSymbols, GetProjectSymbolsResponse),
301    (FuzzySearchUsers, UsersResponse),
302    (GetUsers, UsersResponse),
303    (InviteChannelMember, Ack),
304    (JoinProject, JoinProjectResponse),
305    (JoinRoom, JoinRoomResponse),
306    (JoinChannelChat, JoinChannelChatResponse),
307    (LeaveRoom, Ack),
308    (RejoinRoom, RejoinRoomResponse),
309    (IncomingCall, Ack),
310    (OpenBufferById, OpenBufferResponse),
311    (OpenBufferByPath, OpenBufferResponse),
312    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
313    (Ping, Ack),
314    (PerformRename, PerformRenameResponse),
315    (PrepareRename, PrepareRenameResponse),
316    (OnTypeFormatting, OnTypeFormattingResponse),
317    (InlayHints, InlayHintsResponse),
318    (ResolveInlayHint, ResolveInlayHintResponse),
319    (RefreshInlayHints, Ack),
320    (ReloadBuffers, ReloadBuffersResponse),
321    (RequestContact, Ack),
322    (RemoveChannelMember, Ack),
323    (RemoveContact, Ack),
324    (RespondToContactRequest, Ack),
325    (RespondToChannelInvite, Ack),
326    (SetChannelMemberAdmin, Ack),
327    (SendChannelMessage, SendChannelMessageResponse),
328    (GetChannelMessages, GetChannelMessagesResponse),
329    (GetChannelMembers, GetChannelMembersResponse),
330    (JoinChannel, JoinRoomResponse),
331    (RemoveChannelMessage, Ack),
332    (DeleteChannel, Ack),
333    (RenameProjectEntry, ProjectEntryResponse),
334    (RenameChannel, ChannelResponse),
335    (MoveChannel, Ack),
336    (SaveBuffer, BufferSaved),
337    (SearchProject, SearchProjectResponse),
338    (ShareProject, ShareProjectResponse),
339    (SynchronizeBuffers, SynchronizeBuffersResponse),
340    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
341    (Test, Test),
342    (UpdateBuffer, Ack),
343    (UpdateParticipantLocation, Ack),
344    (UpdateProject, Ack),
345    (UpdateWorktree, Ack),
346    (JoinChannelBuffer, JoinChannelBufferResponse),
347    (LeaveChannelBuffer, Ack)
348);
349
350entity_messages!(
351    project_id,
352    AddProjectCollaborator,
353    ApplyCodeAction,
354    ApplyCompletionAdditionalEdits,
355    BufferReloaded,
356    BufferSaved,
357    CopyProjectEntry,
358    CreateBufferForPeer,
359    CreateProjectEntry,
360    DeleteProjectEntry,
361    ExpandProjectEntry,
362    Follow,
363    FormatBuffers,
364    GetCodeActions,
365    GetCompletions,
366    GetDefinition,
367    GetTypeDefinition,
368    GetDocumentHighlights,
369    GetHover,
370    GetReferences,
371    GetProjectSymbols,
372    JoinProject,
373    LeaveProject,
374    OpenBufferById,
375    OpenBufferByPath,
376    OpenBufferForSymbol,
377    PerformRename,
378    OnTypeFormatting,
379    InlayHints,
380    ResolveInlayHint,
381    RefreshInlayHints,
382    PrepareRename,
383    ReloadBuffers,
384    RemoveProjectCollaborator,
385    RenameProjectEntry,
386    SaveBuffer,
387    SearchProject,
388    StartLanguageServer,
389    SynchronizeBuffers,
390    Unfollow,
391    UnshareProject,
392    UpdateBuffer,
393    UpdateBufferFile,
394    UpdateDiagnosticSummary,
395    UpdateFollowers,
396    UpdateLanguageServer,
397    UpdateProject,
398    UpdateProjectCollaborator,
399    UpdateWorktree,
400    UpdateWorktreeSettings,
401    UpdateDiffBase
402);
403
404entity_messages!(
405    channel_id,
406    ChannelMessageSent,
407    UpdateChannelBuffer,
408    RemoveChannelBufferCollaborator,
409    RemoveChannelMessage,
410    AddChannelBufferCollaborator,
411    UpdateChannelBufferCollaborator
412);
413
414const KIB: usize = 1024;
415const MIB: usize = KIB * 1024;
416const MAX_BUFFER_LEN: usize = MIB;
417
418/// A stream of protobuf messages.
419pub struct MessageStream<S> {
420    stream: S,
421    encoding_buffer: Vec<u8>,
422}
423
424#[allow(clippy::large_enum_variant)]
425#[derive(Debug)]
426pub enum Message {
427    Envelope(Envelope),
428    Ping,
429    Pong,
430}
431
432impl<S> MessageStream<S> {
433    pub fn new(stream: S) -> Self {
434        Self {
435            stream,
436            encoding_buffer: Vec::new(),
437        }
438    }
439
440    pub fn inner_mut(&mut self) -> &mut S {
441        &mut self.stream
442    }
443}
444
445impl<S> MessageStream<S>
446where
447    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
448{
449    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
450        #[cfg(any(test, feature = "test-support"))]
451        const COMPRESSION_LEVEL: i32 = -7;
452
453        #[cfg(not(any(test, feature = "test-support")))]
454        const COMPRESSION_LEVEL: i32 = 4;
455
456        match message {
457            Message::Envelope(message) => {
458                self.encoding_buffer.reserve(message.encoded_len());
459                message
460                    .encode(&mut self.encoding_buffer)
461                    .map_err(io::Error::from)?;
462                let buffer =
463                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
464                        .unwrap();
465
466                self.encoding_buffer.clear();
467                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
468                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
469            }
470            Message::Ping => {
471                self.stream
472                    .send(WebSocketMessage::Ping(Default::default()))
473                    .await?;
474            }
475            Message::Pong => {
476                self.stream
477                    .send(WebSocketMessage::Pong(Default::default()))
478                    .await?;
479            }
480        }
481
482        Ok(())
483    }
484}
485
486impl<S> MessageStream<S>
487where
488    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
489{
490    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
491        while let Some(bytes) = self.stream.next().await {
492            match bytes? {
493                WebSocketMessage::Binary(bytes) => {
494                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
495                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
496                        .map_err(io::Error::from)?;
497
498                    self.encoding_buffer.clear();
499                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
500                    return Ok(Message::Envelope(envelope));
501                }
502                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
503                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
504                WebSocketMessage::Close(_) => break,
505                _ => {}
506            }
507        }
508        Err(anyhow!("connection closed"))
509    }
510}
511
512impl From<Timestamp> for SystemTime {
513    fn from(val: Timestamp) -> Self {
514        UNIX_EPOCH
515            .checked_add(Duration::new(val.seconds, val.nanos))
516            .unwrap()
517    }
518}
519
520impl From<SystemTime> for Timestamp {
521    fn from(time: SystemTime) -> Self {
522        let duration = time.duration_since(UNIX_EPOCH).unwrap();
523        Self {
524            seconds: duration.as_secs(),
525            nanos: duration.subsec_nanos(),
526        }
527    }
528}
529
530impl From<u128> for Nonce {
531    fn from(nonce: u128) -> Self {
532        let upper_half = (nonce >> 64) as u64;
533        let lower_half = nonce as u64;
534        Self {
535            upper_half,
536            lower_half,
537        }
538    }
539}
540
541impl From<Nonce> for u128 {
542    fn from(nonce: Nonce) -> Self {
543        let upper_half = (nonce.upper_half as u128) << 64;
544        let lower_half = nonce.lower_half as u128;
545        upper_half | lower_half
546    }
547}
548
549pub fn split_worktree_update(
550    mut message: UpdateWorktree,
551    max_chunk_size: usize,
552) -> impl Iterator<Item = UpdateWorktree> {
553    let mut done_files = false;
554
555    let mut repository_map = message
556        .updated_repositories
557        .into_iter()
558        .map(|repo| (repo.work_directory_id, repo))
559        .collect::<HashMap<_, _>>();
560
561    iter::from_fn(move || {
562        if done_files {
563            return None;
564        }
565
566        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
567        let updated_entries: Vec<_> = message
568            .updated_entries
569            .drain(..updated_entries_chunk_size)
570            .collect();
571
572        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
573        let removed_entries = message
574            .removed_entries
575            .drain(..removed_entries_chunk_size)
576            .collect();
577
578        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
579
580        let mut updated_repositories = Vec::new();
581
582        if !repository_map.is_empty() {
583            for entry in &updated_entries {
584                if let Some(repo) = repository_map.remove(&entry.id) {
585                    updated_repositories.push(repo)
586                }
587            }
588        }
589
590        let removed_repositories = if done_files {
591            mem::take(&mut message.removed_repositories)
592        } else {
593            Default::default()
594        };
595
596        if done_files {
597            updated_repositories.extend(mem::take(&mut repository_map).into_values());
598        }
599
600        Some(UpdateWorktree {
601            project_id: message.project_id,
602            worktree_id: message.worktree_id,
603            root_name: message.root_name.clone(),
604            abs_path: message.abs_path.clone(),
605            updated_entries,
606            removed_entries,
607            scan_id: message.scan_id,
608            is_last_update: done_files && message.is_last_update,
609            updated_repositories,
610            removed_repositories,
611        })
612    })
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618
619    #[gpui::test]
620    async fn test_buffer_size() {
621        let (tx, rx) = futures::channel::mpsc::unbounded();
622        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
623        sink.write(Message::Envelope(Envelope {
624            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
625                root_name: "abcdefg".repeat(10),
626                ..Default::default()
627            })),
628            ..Default::default()
629        }))
630        .await
631        .unwrap();
632        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
633        sink.write(Message::Envelope(Envelope {
634            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
635                root_name: "abcdefg".repeat(1000000),
636                ..Default::default()
637            })),
638            ..Default::default()
639        }))
640        .await
641        .unwrap();
642        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
643
644        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
645        stream.read().await.unwrap();
646        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
647        stream.read().await.unwrap();
648        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
649    }
650
651    #[gpui::test]
652    fn test_converting_peer_id_from_and_to_u64() {
653        let peer_id = PeerId {
654            owner_id: 10,
655            id: 3,
656        };
657        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
658        let peer_id = PeerId {
659            owner_id: u32::MAX,
660            id: 3,
661        };
662        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
663        let peer_id = PeerId {
664            owner_id: 10,
665            id: u32::MAX,
666        };
667        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
668        let peer_id = PeerId {
669            owner_id: u32::MAX,
670            id: u32::MAX,
671        };
672        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
673    }
674}