proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::{
 12    cmp,
 13    fmt::Debug,
 14    io, iter,
 15    time::{Duration, SystemTime, UNIX_EPOCH},
 16};
 17use std::{fmt, mem};
 18
 19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 20
 21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 22    const NAME: &'static str;
 23    const PRIORITY: MessagePriority;
 24    fn into_envelope(
 25        self,
 26        id: u32,
 27        responding_to: Option<u32>,
 28        original_sender_id: Option<PeerId>,
 29    ) -> Envelope;
 30    fn from_envelope(envelope: Envelope) -> Option<Self>;
 31}
 32
 33pub trait EntityMessage: EnvelopedMessage {
 34    fn remote_entity_id(&self) -> u64;
 35}
 36
 37pub trait RequestMessage: EnvelopedMessage {
 38    type Response: EnvelopedMessage;
 39}
 40
 41pub trait AnyTypedEnvelope: 'static + Send + Sync {
 42    fn payload_type_id(&self) -> TypeId;
 43    fn payload_type_name(&self) -> &'static str;
 44    fn as_any(&self) -> &dyn Any;
 45    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 46    fn is_background(&self) -> bool;
 47    fn original_sender_id(&self) -> Option<PeerId>;
 48    fn sender_id(&self) -> ConnectionId;
 49    fn message_id(&self) -> u32;
 50}
 51
 52pub enum MessagePriority {
 53    Foreground,
 54    Background,
 55}
 56
 57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 58    fn payload_type_id(&self) -> TypeId {
 59        TypeId::of::<T>()
 60    }
 61
 62    fn payload_type_name(&self) -> &'static str {
 63        T::NAME
 64    }
 65
 66    fn as_any(&self) -> &dyn Any {
 67        self
 68    }
 69
 70    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 71        self
 72    }
 73
 74    fn is_background(&self) -> bool {
 75        matches!(T::PRIORITY, MessagePriority::Background)
 76    }
 77
 78    fn original_sender_id(&self) -> Option<PeerId> {
 79        self.original_sender_id
 80    }
 81
 82    fn sender_id(&self) -> ConnectionId {
 83        self.sender_id
 84    }
 85
 86    fn message_id(&self) -> u32 {
 87        self.message_id
 88    }
 89}
 90
 91impl PeerId {
 92    pub fn from_u64(peer_id: u64) -> Self {
 93        let owner_id = (peer_id >> 32) as u32;
 94        let id = peer_id as u32;
 95        Self { owner_id, id }
 96    }
 97
 98    pub fn as_u64(self) -> u64 {
 99        ((self.owner_id as u64) << 32) | (self.id as u64)
100    }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108    fn cmp(&self, other: &Self) -> cmp::Ordering {
109        self.owner_id
110            .cmp(&other.owner_id)
111            .then_with(|| self.id.cmp(&other.id))
112    }
113}
114
115impl PartialOrd for PeerId {
116    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117        Some(self.cmp(other))
118    }
119}
120
121impl std::hash::Hash for PeerId {
122    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123        self.owner_id.hash(state);
124        self.id.hash(state);
125    }
126}
127
128impl fmt::Display for PeerId {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        write!(f, "{}/{}", self.owner_id, self.id)
131    }
132}
133
134messages!(
135    (Ack, Foreground),
136    (AddProjectCollaborator, Foreground),
137    (ApplyCodeAction, Background),
138    (ApplyCodeActionResponse, Background),
139    (ApplyCompletionAdditionalEdits, Background),
140    (ApplyCompletionAdditionalEditsResponse, Background),
141    (BufferReloaded, Foreground),
142    (BufferSaved, Foreground),
143    (Call, Foreground),
144    (CallCanceled, Foreground),
145    (CancelCall, Foreground),
146    (CopyProjectEntry, Foreground),
147    (CreateBufferForPeer, Foreground),
148    (CreateChannel, Foreground),
149    (ChannelResponse, Foreground),
150    (ChannelMessageSent, Foreground),
151    (CreateProjectEntry, Foreground),
152    (CreateRoom, Foreground),
153    (CreateRoomResponse, Foreground),
154    (DeclineCall, Foreground),
155    (DeleteProjectEntry, Foreground),
156    (Error, Foreground),
157    (ExpandProjectEntry, Foreground),
158    (Follow, Foreground),
159    (FollowResponse, Foreground),
160    (FormatBuffers, Foreground),
161    (FormatBuffersResponse, Foreground),
162    (FuzzySearchUsers, Foreground),
163    (GetCodeActions, Background),
164    (GetCodeActionsResponse, Background),
165    (GetHover, Background),
166    (GetHoverResponse, Background),
167    (GetChannelMessages, Background),
168    (GetChannelMessagesResponse, Background),
169    (SendChannelMessage, Background),
170    (SendChannelMessageResponse, Background),
171    (GetCompletions, Background),
172    (GetCompletionsResponse, Background),
173    (GetDefinition, Background),
174    (GetDefinitionResponse, Background),
175    (GetTypeDefinition, Background),
176    (GetTypeDefinitionResponse, Background),
177    (GetDocumentHighlights, Background),
178    (GetDocumentHighlightsResponse, Background),
179    (GetReferences, Background),
180    (GetReferencesResponse, Background),
181    (GetProjectSymbols, Background),
182    (GetProjectSymbolsResponse, Background),
183    (GetUsers, Foreground),
184    (Hello, Foreground),
185    (IncomingCall, Foreground),
186    (InviteChannelMember, Foreground),
187    (UsersResponse, Foreground),
188    (JoinProject, Foreground),
189    (JoinProjectResponse, Foreground),
190    (JoinRoom, Foreground),
191    (JoinRoomResponse, Foreground),
192    (JoinChannelChat, Foreground),
193    (JoinChannelChatResponse, Foreground),
194    (LeaveChannelChat, Foreground),
195    (LeaveProject, Foreground),
196    (LeaveRoom, Foreground),
197    (OpenBufferById, Background),
198    (OpenBufferByPath, Background),
199    (OpenBufferForSymbol, Background),
200    (OpenBufferForSymbolResponse, Background),
201    (OpenBufferResponse, Background),
202    (PerformRename, Background),
203    (PerformRenameResponse, Background),
204    (OnTypeFormatting, Background),
205    (OnTypeFormattingResponse, Background),
206    (InlayHints, Background),
207    (InlayHintsResponse, Background),
208    (ResolveInlayHint, Background),
209    (ResolveInlayHintResponse, Background),
210    (RefreshInlayHints, Foreground),
211    (Ping, Foreground),
212    (PrepareRename, Background),
213    (PrepareRenameResponse, Background),
214    (ExpandProjectEntryResponse, Foreground),
215    (ProjectEntryResponse, Foreground),
216    (RejoinRoom, Foreground),
217    (RejoinRoomResponse, Foreground),
218    (RemoveContact, Foreground),
219    (RemoveChannelMember, Foreground),
220    (ReloadBuffers, Foreground),
221    (ReloadBuffersResponse, Foreground),
222    (RemoveProjectCollaborator, Foreground),
223    (RenameProjectEntry, Foreground),
224    (RequestContact, Foreground),
225    (RespondToContactRequest, Foreground),
226    (RespondToChannelInvite, Foreground),
227    (JoinChannel, Foreground),
228    (RoomUpdated, Foreground),
229    (SaveBuffer, Foreground),
230    (RenameChannel, Foreground),
231    (SetChannelMemberAdmin, Foreground),
232    (SearchProject, Background),
233    (SearchProjectResponse, Background),
234    (ShareProject, Foreground),
235    (ShareProjectResponse, Foreground),
236    (ShowContacts, Foreground),
237    (StartLanguageServer, Foreground),
238    (SynchronizeBuffers, Foreground),
239    (SynchronizeBuffersResponse, Foreground),
240    (RejoinChannelBuffers, Foreground),
241    (RejoinChannelBuffersResponse, Foreground),
242    (Test, Foreground),
243    (Unfollow, Foreground),
244    (UnshareProject, Foreground),
245    (UpdateBuffer, Foreground),
246    (UpdateBufferFile, Foreground),
247    (UpdateContacts, Foreground),
248    (RemoveChannel, Foreground),
249    (UpdateChannels, Foreground),
250    (UpdateDiagnosticSummary, Foreground),
251    (UpdateFollowers, Foreground),
252    (UpdateInviteInfo, Foreground),
253    (UpdateLanguageServer, Foreground),
254    (UpdateParticipantLocation, Foreground),
255    (UpdateProject, Foreground),
256    (UpdateProjectCollaborator, Foreground),
257    (UpdateWorktree, Foreground),
258    (UpdateWorktreeSettings, Foreground),
259    (UpdateDiffBase, Foreground),
260    (GetPrivateUserInfo, Foreground),
261    (GetPrivateUserInfoResponse, Foreground),
262    (GetChannelMembers, Foreground),
263    (GetChannelMembersResponse, Foreground),
264    (JoinChannelBuffer, Foreground),
265    (JoinChannelBufferResponse, Foreground),
266    (LeaveChannelBuffer, Background),
267    (UpdateChannelBuffer, Foreground),
268    (RemoveChannelBufferCollaborator, Foreground),
269    (AddChannelBufferCollaborator, Foreground),
270    (UpdateChannelBufferCollaborator, Foreground),
271);
272
273request_messages!(
274    (ApplyCodeAction, ApplyCodeActionResponse),
275    (
276        ApplyCompletionAdditionalEdits,
277        ApplyCompletionAdditionalEditsResponse
278    ),
279    (Call, Ack),
280    (CancelCall, Ack),
281    (CopyProjectEntry, ProjectEntryResponse),
282    (CreateProjectEntry, ProjectEntryResponse),
283    (CreateRoom, CreateRoomResponse),
284    (CreateChannel, ChannelResponse),
285    (DeclineCall, Ack),
286    (DeleteProjectEntry, ProjectEntryResponse),
287    (ExpandProjectEntry, ExpandProjectEntryResponse),
288    (Follow, FollowResponse),
289    (FormatBuffers, FormatBuffersResponse),
290    (GetCodeActions, GetCodeActionsResponse),
291    (GetHover, GetHoverResponse),
292    (GetCompletions, GetCompletionsResponse),
293    (GetDefinition, GetDefinitionResponse),
294    (GetTypeDefinition, GetTypeDefinitionResponse),
295    (GetDocumentHighlights, GetDocumentHighlightsResponse),
296    (GetReferences, GetReferencesResponse),
297    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
298    (GetProjectSymbols, GetProjectSymbolsResponse),
299    (FuzzySearchUsers, UsersResponse),
300    (GetUsers, UsersResponse),
301    (InviteChannelMember, Ack),
302    (JoinProject, JoinProjectResponse),
303    (JoinRoom, JoinRoomResponse),
304    (JoinChannelChat, JoinChannelChatResponse),
305    (LeaveRoom, Ack),
306    (RejoinRoom, RejoinRoomResponse),
307    (IncomingCall, Ack),
308    (OpenBufferById, OpenBufferResponse),
309    (OpenBufferByPath, OpenBufferResponse),
310    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
311    (Ping, Ack),
312    (PerformRename, PerformRenameResponse),
313    (PrepareRename, PrepareRenameResponse),
314    (OnTypeFormatting, OnTypeFormattingResponse),
315    (InlayHints, InlayHintsResponse),
316    (ResolveInlayHint, ResolveInlayHintResponse),
317    (RefreshInlayHints, Ack),
318    (ReloadBuffers, ReloadBuffersResponse),
319    (RequestContact, Ack),
320    (RemoveChannelMember, Ack),
321    (RemoveContact, Ack),
322    (RespondToContactRequest, Ack),
323    (RespondToChannelInvite, Ack),
324    (SetChannelMemberAdmin, Ack),
325    (SendChannelMessage, SendChannelMessageResponse),
326    (GetChannelMessages, GetChannelMessagesResponse),
327    (GetChannelMembers, GetChannelMembersResponse),
328    (JoinChannel, JoinRoomResponse),
329    (RemoveChannel, Ack),
330    (RenameProjectEntry, ProjectEntryResponse),
331    (RenameChannel, ChannelResponse),
332    (SaveBuffer, BufferSaved),
333    (SearchProject, SearchProjectResponse),
334    (ShareProject, ShareProjectResponse),
335    (SynchronizeBuffers, SynchronizeBuffersResponse),
336    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
337    (Test, Test),
338    (UpdateBuffer, Ack),
339    (UpdateParticipantLocation, Ack),
340    (UpdateProject, Ack),
341    (UpdateWorktree, Ack),
342    (JoinChannelBuffer, JoinChannelBufferResponse),
343    (LeaveChannelBuffer, Ack)
344);
345
346entity_messages!(
347    project_id,
348    AddProjectCollaborator,
349    ApplyCodeAction,
350    ApplyCompletionAdditionalEdits,
351    BufferReloaded,
352    BufferSaved,
353    CopyProjectEntry,
354    CreateBufferForPeer,
355    CreateProjectEntry,
356    DeleteProjectEntry,
357    ExpandProjectEntry,
358    Follow,
359    FormatBuffers,
360    GetCodeActions,
361    GetCompletions,
362    GetDefinition,
363    GetTypeDefinition,
364    GetDocumentHighlights,
365    GetHover,
366    GetReferences,
367    GetProjectSymbols,
368    JoinProject,
369    LeaveProject,
370    OpenBufferById,
371    OpenBufferByPath,
372    OpenBufferForSymbol,
373    PerformRename,
374    OnTypeFormatting,
375    InlayHints,
376    ResolveInlayHint,
377    RefreshInlayHints,
378    PrepareRename,
379    ReloadBuffers,
380    RemoveProjectCollaborator,
381    RenameProjectEntry,
382    SaveBuffer,
383    SearchProject,
384    StartLanguageServer,
385    SynchronizeBuffers,
386    Unfollow,
387    UnshareProject,
388    UpdateBuffer,
389    UpdateBufferFile,
390    UpdateDiagnosticSummary,
391    UpdateFollowers,
392    UpdateLanguageServer,
393    UpdateProject,
394    UpdateProjectCollaborator,
395    UpdateWorktree,
396    UpdateWorktreeSettings,
397    UpdateDiffBase
398);
399
400entity_messages!(
401    channel_id,
402    ChannelMessageSent,
403    UpdateChannelBuffer,
404    RemoveChannelBufferCollaborator,
405    AddChannelBufferCollaborator,
406    UpdateChannelBufferCollaborator
407);
408
409const KIB: usize = 1024;
410const MIB: usize = KIB * 1024;
411const MAX_BUFFER_LEN: usize = MIB;
412
413/// A stream of protobuf messages.
414pub struct MessageStream<S> {
415    stream: S,
416    encoding_buffer: Vec<u8>,
417}
418
419#[allow(clippy::large_enum_variant)]
420#[derive(Debug)]
421pub enum Message {
422    Envelope(Envelope),
423    Ping,
424    Pong,
425}
426
427impl<S> MessageStream<S> {
428    pub fn new(stream: S) -> Self {
429        Self {
430            stream,
431            encoding_buffer: Vec::new(),
432        }
433    }
434
435    pub fn inner_mut(&mut self) -> &mut S {
436        &mut self.stream
437    }
438}
439
440impl<S> MessageStream<S>
441where
442    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
443{
444    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
445        #[cfg(any(test, feature = "test-support"))]
446        const COMPRESSION_LEVEL: i32 = -7;
447
448        #[cfg(not(any(test, feature = "test-support")))]
449        const COMPRESSION_LEVEL: i32 = 4;
450
451        match message {
452            Message::Envelope(message) => {
453                self.encoding_buffer.reserve(message.encoded_len());
454                message
455                    .encode(&mut self.encoding_buffer)
456                    .map_err(io::Error::from)?;
457                let buffer =
458                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
459                        .unwrap();
460
461                self.encoding_buffer.clear();
462                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
463                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
464            }
465            Message::Ping => {
466                self.stream
467                    .send(WebSocketMessage::Ping(Default::default()))
468                    .await?;
469            }
470            Message::Pong => {
471                self.stream
472                    .send(WebSocketMessage::Pong(Default::default()))
473                    .await?;
474            }
475        }
476
477        Ok(())
478    }
479}
480
481impl<S> MessageStream<S>
482where
483    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
484{
485    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
486        while let Some(bytes) = self.stream.next().await {
487            match bytes? {
488                WebSocketMessage::Binary(bytes) => {
489                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
490                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
491                        .map_err(io::Error::from)?;
492
493                    self.encoding_buffer.clear();
494                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
495                    return Ok(Message::Envelope(envelope));
496                }
497                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
498                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
499                WebSocketMessage::Close(_) => break,
500                _ => {}
501            }
502        }
503        Err(anyhow!("connection closed"))
504    }
505}
506
507impl From<Timestamp> for SystemTime {
508    fn from(val: Timestamp) -> Self {
509        UNIX_EPOCH
510            .checked_add(Duration::new(val.seconds, val.nanos))
511            .unwrap()
512    }
513}
514
515impl From<SystemTime> for Timestamp {
516    fn from(time: SystemTime) -> Self {
517        let duration = time.duration_since(UNIX_EPOCH).unwrap();
518        Self {
519            seconds: duration.as_secs(),
520            nanos: duration.subsec_nanos(),
521        }
522    }
523}
524
525impl From<u128> for Nonce {
526    fn from(nonce: u128) -> Self {
527        let upper_half = (nonce >> 64) as u64;
528        let lower_half = nonce as u64;
529        Self {
530            upper_half,
531            lower_half,
532        }
533    }
534}
535
536impl From<Nonce> for u128 {
537    fn from(nonce: Nonce) -> Self {
538        let upper_half = (nonce.upper_half as u128) << 64;
539        let lower_half = nonce.lower_half as u128;
540        upper_half | lower_half
541    }
542}
543
544pub fn split_worktree_update(
545    mut message: UpdateWorktree,
546    max_chunk_size: usize,
547) -> impl Iterator<Item = UpdateWorktree> {
548    let mut done_files = false;
549
550    let mut repository_map = message
551        .updated_repositories
552        .into_iter()
553        .map(|repo| (repo.work_directory_id, repo))
554        .collect::<HashMap<_, _>>();
555
556    iter::from_fn(move || {
557        if done_files {
558            return None;
559        }
560
561        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
562        let updated_entries: Vec<_> = message
563            .updated_entries
564            .drain(..updated_entries_chunk_size)
565            .collect();
566
567        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
568        let removed_entries = message
569            .removed_entries
570            .drain(..removed_entries_chunk_size)
571            .collect();
572
573        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
574
575        let mut updated_repositories = Vec::new();
576
577        if !repository_map.is_empty() {
578            for entry in &updated_entries {
579                if let Some(repo) = repository_map.remove(&entry.id) {
580                    updated_repositories.push(repo)
581                }
582            }
583        }
584
585        let removed_repositories = if done_files {
586            mem::take(&mut message.removed_repositories)
587        } else {
588            Default::default()
589        };
590
591        if done_files {
592            updated_repositories.extend(mem::take(&mut repository_map).into_values());
593        }
594
595        Some(UpdateWorktree {
596            project_id: message.project_id,
597            worktree_id: message.worktree_id,
598            root_name: message.root_name.clone(),
599            abs_path: message.abs_path.clone(),
600            updated_entries,
601            removed_entries,
602            scan_id: message.scan_id,
603            is_last_update: done_files && message.is_last_update,
604            updated_repositories,
605            removed_repositories,
606        })
607    })
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613
614    #[gpui::test]
615    async fn test_buffer_size() {
616        let (tx, rx) = futures::channel::mpsc::unbounded();
617        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
618        sink.write(Message::Envelope(Envelope {
619            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
620                root_name: "abcdefg".repeat(10),
621                ..Default::default()
622            })),
623            ..Default::default()
624        }))
625        .await
626        .unwrap();
627        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
628        sink.write(Message::Envelope(Envelope {
629            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
630                root_name: "abcdefg".repeat(1000000),
631                ..Default::default()
632            })),
633            ..Default::default()
634        }))
635        .await
636        .unwrap();
637        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
638
639        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
640        stream.read().await.unwrap();
641        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
642        stream.read().await.unwrap();
643        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
644    }
645
646    #[gpui::test]
647    fn test_converting_peer_id_from_and_to_u64() {
648        let peer_id = PeerId {
649            owner_id: 10,
650            id: 3,
651        };
652        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
653        let peer_id = PeerId {
654            owner_id: u32::MAX,
655            id: 3,
656        };
657        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
658        let peer_id = PeerId {
659            owner_id: 10,
660            id: u32::MAX,
661        };
662        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
663        let peer_id = PeerId {
664            owner_id: u32::MAX,
665            id: u32::MAX,
666        };
667        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
668    }
669}