proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::{
 12    cmp,
 13    fmt::Debug,
 14    io, iter,
 15    time::{Duration, SystemTime, UNIX_EPOCH},
 16};
 17use std::{fmt, mem};
 18
 19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 20
 21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 22    const NAME: &'static str;
 23    const PRIORITY: MessagePriority;
 24    fn into_envelope(
 25        self,
 26        id: u32,
 27        responding_to: Option<u32>,
 28        original_sender_id: Option<PeerId>,
 29    ) -> Envelope;
 30    fn from_envelope(envelope: Envelope) -> Option<Self>;
 31}
 32
 33pub trait EntityMessage: EnvelopedMessage {
 34    fn remote_entity_id(&self) -> u64;
 35}
 36
 37pub trait RequestMessage: EnvelopedMessage {
 38    type Response: EnvelopedMessage;
 39}
 40
 41pub trait AnyTypedEnvelope: 'static + Send + Sync {
 42    fn payload_type_id(&self) -> TypeId;
 43    fn payload_type_name(&self) -> &'static str;
 44    fn as_any(&self) -> &dyn Any;
 45    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 46    fn is_background(&self) -> bool;
 47    fn original_sender_id(&self) -> Option<PeerId>;
 48    fn sender_id(&self) -> ConnectionId;
 49    fn message_id(&self) -> u32;
 50}
 51
 52pub enum MessagePriority {
 53    Foreground,
 54    Background,
 55}
 56
 57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 58    fn payload_type_id(&self) -> TypeId {
 59        TypeId::of::<T>()
 60    }
 61
 62    fn payload_type_name(&self) -> &'static str {
 63        T::NAME
 64    }
 65
 66    fn as_any(&self) -> &dyn Any {
 67        self
 68    }
 69
 70    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 71        self
 72    }
 73
 74    fn is_background(&self) -> bool {
 75        matches!(T::PRIORITY, MessagePriority::Background)
 76    }
 77
 78    fn original_sender_id(&self) -> Option<PeerId> {
 79        self.original_sender_id
 80    }
 81
 82    fn sender_id(&self) -> ConnectionId {
 83        self.sender_id
 84    }
 85
 86    fn message_id(&self) -> u32 {
 87        self.message_id
 88    }
 89}
 90
 91impl PeerId {
 92    pub fn from_u64(peer_id: u64) -> Self {
 93        let owner_id = (peer_id >> 32) as u32;
 94        let id = peer_id as u32;
 95        Self { owner_id, id }
 96    }
 97
 98    pub fn as_u64(self) -> u64 {
 99        ((self.owner_id as u64) << 32) | (self.id as u64)
100    }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108    fn cmp(&self, other: &Self) -> cmp::Ordering {
109        self.owner_id
110            .cmp(&other.owner_id)
111            .then_with(|| self.id.cmp(&other.id))
112    }
113}
114
115impl PartialOrd for PeerId {
116    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117        Some(self.cmp(other))
118    }
119}
120
121impl std::hash::Hash for PeerId {
122    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123        self.owner_id.hash(state);
124        self.id.hash(state);
125    }
126}
127
128impl fmt::Display for PeerId {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        write!(f, "{}/{}", self.owner_id, self.id)
131    }
132}
133
134messages!(
135    (Ack, Foreground),
136    (AddProjectCollaborator, Foreground),
137    (ApplyCodeAction, Background),
138    (ApplyCodeActionResponse, Background),
139    (ApplyCompletionAdditionalEdits, Background),
140    (ApplyCompletionAdditionalEditsResponse, Background),
141    (BufferReloaded, Foreground),
142    (BufferSaved, Foreground),
143    (Call, Foreground),
144    (CallCanceled, Foreground),
145    (CancelCall, Foreground),
146    (CopyProjectEntry, Foreground),
147    (CreateBufferForPeer, Foreground),
148    (CreateChannel, Foreground),
149    (ChannelResponse, Foreground),
150    (ChannelMessageSent, Foreground),
151    (CreateProjectEntry, Foreground),
152    (CreateRoom, Foreground),
153    (CreateRoomResponse, Foreground),
154    (DeclineCall, Foreground),
155    (DeleteProjectEntry, Foreground),
156    (Error, Foreground),
157    (ExpandProjectEntry, Foreground),
158    (Follow, Foreground),
159    (FollowResponse, Foreground),
160    (FormatBuffers, Foreground),
161    (FormatBuffersResponse, Foreground),
162    (FuzzySearchUsers, Foreground),
163    (GetCodeActions, Background),
164    (GetCodeActionsResponse, Background),
165    (GetHover, Background),
166    (GetHoverResponse, Background),
167    (GetChannelMessages, Background),
168    (GetChannelMessagesResponse, Background),
169    (SendChannelMessage, Background),
170    (SendChannelMessageResponse, Background),
171    (GetCompletions, Background),
172    (GetCompletionsResponse, Background),
173    (GetDefinition, Background),
174    (GetDefinitionResponse, Background),
175    (GetTypeDefinition, Background),
176    (GetTypeDefinitionResponse, Background),
177    (GetDocumentHighlights, Background),
178    (GetDocumentHighlightsResponse, Background),
179    (GetReferences, Background),
180    (GetReferencesResponse, Background),
181    (GetProjectSymbols, Background),
182    (GetProjectSymbolsResponse, Background),
183    (GetUsers, Foreground),
184    (Hello, Foreground),
185    (IncomingCall, Foreground),
186    (InviteChannelMember, Foreground),
187    (UsersResponse, Foreground),
188    (JoinProject, Foreground),
189    (JoinProjectResponse, Foreground),
190    (JoinRoom, Foreground),
191    (JoinRoomResponse, Foreground),
192    (JoinChannelChat, Foreground),
193    (JoinChannelChatResponse, Foreground),
194    (LeaveChannelChat, Foreground),
195    (LeaveProject, Foreground),
196    (LeaveRoom, Foreground),
197    (OpenBufferById, Background),
198    (OpenBufferByPath, Background),
199    (OpenBufferForSymbol, Background),
200    (OpenBufferForSymbolResponse, Background),
201    (OpenBufferResponse, Background),
202    (PerformRename, Background),
203    (PerformRenameResponse, Background),
204    (OnTypeFormatting, Background),
205    (OnTypeFormattingResponse, Background),
206    (InlayHints, Background),
207    (InlayHintsResponse, Background),
208    (ResolveInlayHint, Background),
209    (ResolveInlayHintResponse, Background),
210    (RefreshInlayHints, Foreground),
211    (Ping, Foreground),
212    (PrepareRename, Background),
213    (PrepareRenameResponse, Background),
214    (ExpandProjectEntryResponse, Foreground),
215    (ProjectEntryResponse, Foreground),
216    (RejoinRoom, Foreground),
217    (RejoinRoomResponse, Foreground),
218    (RemoveContact, Foreground),
219    (RemoveChannelMember, Foreground),
220    (RemoveChannelMessage, Foreground),
221    (ReloadBuffers, Foreground),
222    (ReloadBuffersResponse, Foreground),
223    (RemoveProjectCollaborator, Foreground),
224    (RenameProjectEntry, Foreground),
225    (RequestContact, Foreground),
226    (RespondToContactRequest, Foreground),
227    (RespondToChannelInvite, Foreground),
228    (JoinChannel, Foreground),
229    (RoomUpdated, Foreground),
230    (SaveBuffer, Foreground),
231    (RenameChannel, Foreground),
232    (SetChannelMemberAdmin, Foreground),
233    (SearchProject, Background),
234    (SearchProjectResponse, Background),
235    (ShareProject, Foreground),
236    (ShareProjectResponse, Foreground),
237    (ShowContacts, Foreground),
238    (StartLanguageServer, Foreground),
239    (SynchronizeBuffers, Foreground),
240    (SynchronizeBuffersResponse, Foreground),
241    (RejoinChannelBuffers, Foreground),
242    (RejoinChannelBuffersResponse, Foreground),
243    (Test, Foreground),
244    (Unfollow, Foreground),
245    (UnshareProject, Foreground),
246    (UpdateBuffer, Foreground),
247    (UpdateBufferFile, Foreground),
248    (UpdateContacts, Foreground),
249    (DeleteChannel, Foreground),
250    (MoveChannel, Foreground),
251    (UpdateChannels, Foreground),
252    (UpdateDiagnosticSummary, Foreground),
253    (UpdateFollowers, Foreground),
254    (UpdateInviteInfo, Foreground),
255    (UpdateLanguageServer, Foreground),
256    (UpdateParticipantLocation, Foreground),
257    (UpdateProject, Foreground),
258    (UpdateProjectCollaborator, Foreground),
259    (UpdateWorktree, Foreground),
260    (UpdateWorktreeSettings, Foreground),
261    (UpdateDiffBase, Foreground),
262    (GetPrivateUserInfo, Foreground),
263    (GetPrivateUserInfoResponse, Foreground),
264    (GetChannelMembers, Foreground),
265    (GetChannelMembersResponse, Foreground),
266    (JoinChannelBuffer, Foreground),
267    (JoinChannelBufferResponse, Foreground),
268    (LeaveChannelBuffer, Background),
269    (UpdateChannelBuffer, Foreground),
270    (RemoveChannelBufferCollaborator, Foreground),
271    (AddChannelBufferCollaborator, Foreground),
272    (UpdateChannelBufferCollaborator, Foreground),
273);
274
275request_messages!(
276    (ApplyCodeAction, ApplyCodeActionResponse),
277    (
278        ApplyCompletionAdditionalEdits,
279        ApplyCompletionAdditionalEditsResponse
280    ),
281    (Call, Ack),
282    (CancelCall, Ack),
283    (CopyProjectEntry, ProjectEntryResponse),
284    (CreateProjectEntry, ProjectEntryResponse),
285    (CreateRoom, CreateRoomResponse),
286    (CreateChannel, ChannelResponse),
287    (DeclineCall, Ack),
288    (DeleteProjectEntry, ProjectEntryResponse),
289    (ExpandProjectEntry, ExpandProjectEntryResponse),
290    (Follow, FollowResponse),
291    (FormatBuffers, FormatBuffersResponse),
292    (GetCodeActions, GetCodeActionsResponse),
293    (GetHover, GetHoverResponse),
294    (GetCompletions, GetCompletionsResponse),
295    (GetDefinition, GetDefinitionResponse),
296    (GetTypeDefinition, GetTypeDefinitionResponse),
297    (GetDocumentHighlights, GetDocumentHighlightsResponse),
298    (GetReferences, GetReferencesResponse),
299    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
300    (GetProjectSymbols, GetProjectSymbolsResponse),
301    (FuzzySearchUsers, UsersResponse),
302    (GetUsers, UsersResponse),
303    (InviteChannelMember, Ack),
304    (JoinProject, JoinProjectResponse),
305    (JoinRoom, JoinRoomResponse),
306    (JoinChannelChat, JoinChannelChatResponse),
307    (LeaveRoom, Ack),
308    (RejoinRoom, RejoinRoomResponse),
309    (IncomingCall, Ack),
310    (OpenBufferById, OpenBufferResponse),
311    (OpenBufferByPath, OpenBufferResponse),
312    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
313    (Ping, Ack),
314    (PerformRename, PerformRenameResponse),
315    (PrepareRename, PrepareRenameResponse),
316    (OnTypeFormatting, OnTypeFormattingResponse),
317    (InlayHints, InlayHintsResponse),
318    (ResolveInlayHint, ResolveInlayHintResponse),
319    (RefreshInlayHints, Ack),
320    (ReloadBuffers, ReloadBuffersResponse),
321    (RequestContact, Ack),
322    (RemoveChannelMember, Ack),
323    (RemoveContact, Ack),
324    (RespondToContactRequest, Ack),
325    (RespondToChannelInvite, Ack),
326    (SetChannelMemberAdmin, Ack),
327    (SendChannelMessage, SendChannelMessageResponse),
328    (GetChannelMessages, GetChannelMessagesResponse),
329    (GetChannelMembers, GetChannelMembersResponse),
330    (JoinChannel, JoinRoomResponse),
331    (RemoveChannel, Ack),
332    (RemoveChannelMessage, Ack),
333    (DeleteChannel, Ack),
334    (RenameProjectEntry, ProjectEntryResponse),
335    (RenameChannel, ChannelResponse),
336    (MoveChannel, Ack),
337    (SaveBuffer, BufferSaved),
338    (SearchProject, SearchProjectResponse),
339    (ShareProject, ShareProjectResponse),
340    (SynchronizeBuffers, SynchronizeBuffersResponse),
341    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
342    (Test, Test),
343    (UpdateBuffer, Ack),
344    (UpdateParticipantLocation, Ack),
345    (UpdateProject, Ack),
346    (UpdateWorktree, Ack),
347    (JoinChannelBuffer, JoinChannelBufferResponse),
348    (LeaveChannelBuffer, Ack)
349);
350
351entity_messages!(
352    project_id,
353    AddProjectCollaborator,
354    ApplyCodeAction,
355    ApplyCompletionAdditionalEdits,
356    BufferReloaded,
357    BufferSaved,
358    CopyProjectEntry,
359    CreateBufferForPeer,
360    CreateProjectEntry,
361    DeleteProjectEntry,
362    ExpandProjectEntry,
363    Follow,
364    FormatBuffers,
365    GetCodeActions,
366    GetCompletions,
367    GetDefinition,
368    GetTypeDefinition,
369    GetDocumentHighlights,
370    GetHover,
371    GetReferences,
372    GetProjectSymbols,
373    JoinProject,
374    LeaveProject,
375    OpenBufferById,
376    OpenBufferByPath,
377    OpenBufferForSymbol,
378    PerformRename,
379    OnTypeFormatting,
380    InlayHints,
381    ResolveInlayHint,
382    RefreshInlayHints,
383    PrepareRename,
384    ReloadBuffers,
385    RemoveProjectCollaborator,
386    RenameProjectEntry,
387    SaveBuffer,
388    SearchProject,
389    StartLanguageServer,
390    SynchronizeBuffers,
391    Unfollow,
392    UnshareProject,
393    UpdateBuffer,
394    UpdateBufferFile,
395    UpdateDiagnosticSummary,
396    UpdateFollowers,
397    UpdateLanguageServer,
398    UpdateProject,
399    UpdateProjectCollaborator,
400    UpdateWorktree,
401    UpdateWorktreeSettings,
402    UpdateDiffBase
403);
404
405entity_messages!(
406    channel_id,
407    ChannelMessageSent,
408    UpdateChannelBuffer,
409    RemoveChannelBufferCollaborator,
410    RemoveChannelMessage,
411    AddChannelBufferCollaborator,
412    UpdateChannelBufferCollaborator
413);
414
415const KIB: usize = 1024;
416const MIB: usize = KIB * 1024;
417const MAX_BUFFER_LEN: usize = MIB;
418
419/// A stream of protobuf messages.
420pub struct MessageStream<S> {
421    stream: S,
422    encoding_buffer: Vec<u8>,
423}
424
425#[allow(clippy::large_enum_variant)]
426#[derive(Debug)]
427pub enum Message {
428    Envelope(Envelope),
429    Ping,
430    Pong,
431}
432
433impl<S> MessageStream<S> {
434    pub fn new(stream: S) -> Self {
435        Self {
436            stream,
437            encoding_buffer: Vec::new(),
438        }
439    }
440
441    pub fn inner_mut(&mut self) -> &mut S {
442        &mut self.stream
443    }
444}
445
446impl<S> MessageStream<S>
447where
448    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
449{
450    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
451        #[cfg(any(test, feature = "test-support"))]
452        const COMPRESSION_LEVEL: i32 = -7;
453
454        #[cfg(not(any(test, feature = "test-support")))]
455        const COMPRESSION_LEVEL: i32 = 4;
456
457        match message {
458            Message::Envelope(message) => {
459                self.encoding_buffer.reserve(message.encoded_len());
460                message
461                    .encode(&mut self.encoding_buffer)
462                    .map_err(io::Error::from)?;
463                let buffer =
464                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
465                        .unwrap();
466
467                self.encoding_buffer.clear();
468                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
469                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
470            }
471            Message::Ping => {
472                self.stream
473                    .send(WebSocketMessage::Ping(Default::default()))
474                    .await?;
475            }
476            Message::Pong => {
477                self.stream
478                    .send(WebSocketMessage::Pong(Default::default()))
479                    .await?;
480            }
481        }
482
483        Ok(())
484    }
485}
486
487impl<S> MessageStream<S>
488where
489    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
490{
491    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
492        while let Some(bytes) = self.stream.next().await {
493            match bytes? {
494                WebSocketMessage::Binary(bytes) => {
495                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
496                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
497                        .map_err(io::Error::from)?;
498
499                    self.encoding_buffer.clear();
500                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
501                    return Ok(Message::Envelope(envelope));
502                }
503                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
504                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
505                WebSocketMessage::Close(_) => break,
506                _ => {}
507            }
508        }
509        Err(anyhow!("connection closed"))
510    }
511}
512
513impl From<Timestamp> for SystemTime {
514    fn from(val: Timestamp) -> Self {
515        UNIX_EPOCH
516            .checked_add(Duration::new(val.seconds, val.nanos))
517            .unwrap()
518    }
519}
520
521impl From<SystemTime> for Timestamp {
522    fn from(time: SystemTime) -> Self {
523        let duration = time.duration_since(UNIX_EPOCH).unwrap();
524        Self {
525            seconds: duration.as_secs(),
526            nanos: duration.subsec_nanos(),
527        }
528    }
529}
530
531impl From<u128> for Nonce {
532    fn from(nonce: u128) -> Self {
533        let upper_half = (nonce >> 64) as u64;
534        let lower_half = nonce as u64;
535        Self {
536            upper_half,
537            lower_half,
538        }
539    }
540}
541
542impl From<Nonce> for u128 {
543    fn from(nonce: Nonce) -> Self {
544        let upper_half = (nonce.upper_half as u128) << 64;
545        let lower_half = nonce.lower_half as u128;
546        upper_half | lower_half
547    }
548}
549
550pub fn split_worktree_update(
551    mut message: UpdateWorktree,
552    max_chunk_size: usize,
553) -> impl Iterator<Item = UpdateWorktree> {
554    let mut done_files = false;
555
556    let mut repository_map = message
557        .updated_repositories
558        .into_iter()
559        .map(|repo| (repo.work_directory_id, repo))
560        .collect::<HashMap<_, _>>();
561
562    iter::from_fn(move || {
563        if done_files {
564            return None;
565        }
566
567        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
568        let updated_entries: Vec<_> = message
569            .updated_entries
570            .drain(..updated_entries_chunk_size)
571            .collect();
572
573        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
574        let removed_entries = message
575            .removed_entries
576            .drain(..removed_entries_chunk_size)
577            .collect();
578
579        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
580
581        let mut updated_repositories = Vec::new();
582
583        if !repository_map.is_empty() {
584            for entry in &updated_entries {
585                if let Some(repo) = repository_map.remove(&entry.id) {
586                    updated_repositories.push(repo)
587                }
588            }
589        }
590
591        let removed_repositories = if done_files {
592            mem::take(&mut message.removed_repositories)
593        } else {
594            Default::default()
595        };
596
597        if done_files {
598            updated_repositories.extend(mem::take(&mut repository_map).into_values());
599        }
600
601        Some(UpdateWorktree {
602            project_id: message.project_id,
603            worktree_id: message.worktree_id,
604            root_name: message.root_name.clone(),
605            abs_path: message.abs_path.clone(),
606            updated_entries,
607            removed_entries,
608            scan_id: message.scan_id,
609            is_last_update: done_files && message.is_last_update,
610            updated_repositories,
611            removed_repositories,
612        })
613    })
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619
620    #[gpui::test]
621    async fn test_buffer_size() {
622        let (tx, rx) = futures::channel::mpsc::unbounded();
623        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
624        sink.write(Message::Envelope(Envelope {
625            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
626                root_name: "abcdefg".repeat(10),
627                ..Default::default()
628            })),
629            ..Default::default()
630        }))
631        .await
632        .unwrap();
633        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
634        sink.write(Message::Envelope(Envelope {
635            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
636                root_name: "abcdefg".repeat(1000000),
637                ..Default::default()
638            })),
639            ..Default::default()
640        }))
641        .await
642        .unwrap();
643        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
644
645        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
646        stream.read().await.unwrap();
647        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
648        stream.read().await.unwrap();
649        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
650    }
651
652    #[gpui::test]
653    fn test_converting_peer_id_from_and_to_u64() {
654        let peer_id = PeerId {
655            owner_id: 10,
656            id: 3,
657        };
658        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
659        let peer_id = PeerId {
660            owner_id: u32::MAX,
661            id: 3,
662        };
663        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
664        let peer_id = PeerId {
665            owner_id: 10,
666            id: u32::MAX,
667        };
668        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
669        let peer_id = PeerId {
670            owner_id: u32::MAX,
671            id: u32::MAX,
672        };
673        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
674    }
675}