proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::{
 12    cmp,
 13    fmt::Debug,
 14    io, iter,
 15    time::{Duration, SystemTime, UNIX_EPOCH},
 16};
 17use std::{fmt, mem};
 18
 19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 20
 21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 22    const NAME: &'static str;
 23    const PRIORITY: MessagePriority;
 24    fn into_envelope(
 25        self,
 26        id: u32,
 27        responding_to: Option<u32>,
 28        original_sender_id: Option<PeerId>,
 29    ) -> Envelope;
 30    fn from_envelope(envelope: Envelope) -> Option<Self>;
 31}
 32
 33pub trait EntityMessage: EnvelopedMessage {
 34    fn remote_entity_id(&self) -> u64;
 35}
 36
 37pub trait RequestMessage: EnvelopedMessage {
 38    type Response: EnvelopedMessage;
 39}
 40
 41pub trait AnyTypedEnvelope: 'static + Send + Sync {
 42    fn payload_type_id(&self) -> TypeId;
 43    fn payload_type_name(&self) -> &'static str;
 44    fn as_any(&self) -> &dyn Any;
 45    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 46    fn is_background(&self) -> bool;
 47    fn original_sender_id(&self) -> Option<PeerId>;
 48    fn sender_id(&self) -> ConnectionId;
 49    fn message_id(&self) -> u32;
 50}
 51
 52pub enum MessagePriority {
 53    Foreground,
 54    Background,
 55}
 56
 57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 58    fn payload_type_id(&self) -> TypeId {
 59        TypeId::of::<T>()
 60    }
 61
 62    fn payload_type_name(&self) -> &'static str {
 63        T::NAME
 64    }
 65
 66    fn as_any(&self) -> &dyn Any {
 67        self
 68    }
 69
 70    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 71        self
 72    }
 73
 74    fn is_background(&self) -> bool {
 75        matches!(T::PRIORITY, MessagePriority::Background)
 76    }
 77
 78    fn original_sender_id(&self) -> Option<PeerId> {
 79        self.original_sender_id
 80    }
 81
 82    fn sender_id(&self) -> ConnectionId {
 83        self.sender_id
 84    }
 85
 86    fn message_id(&self) -> u32 {
 87        self.message_id
 88    }
 89}
 90
 91impl PeerId {
 92    pub fn from_u64(peer_id: u64) -> Self {
 93        let owner_id = (peer_id >> 32) as u32;
 94        let id = peer_id as u32;
 95        Self { owner_id, id }
 96    }
 97
 98    pub fn as_u64(self) -> u64 {
 99        ((self.owner_id as u64) << 32) | (self.id as u64)
100    }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108    fn cmp(&self, other: &Self) -> cmp::Ordering {
109        self.owner_id
110            .cmp(&other.owner_id)
111            .then_with(|| self.id.cmp(&other.id))
112    }
113}
114
115impl PartialOrd for PeerId {
116    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117        Some(self.cmp(other))
118    }
119}
120
121impl std::hash::Hash for PeerId {
122    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123        self.owner_id.hash(state);
124        self.id.hash(state);
125    }
126}
127
128impl fmt::Display for PeerId {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        write!(f, "{}/{}", self.owner_id, self.id)
131    }
132}
133
134messages!(
135    (Ack, Foreground),
136    (AddProjectCollaborator, Foreground),
137    (ApplyCodeAction, Background),
138    (ApplyCodeActionResponse, Background),
139    (ApplyCompletionAdditionalEdits, Background),
140    (ApplyCompletionAdditionalEditsResponse, Background),
141    (BufferReloaded, Foreground),
142    (BufferSaved, Foreground),
143    (Call, Foreground),
144    (CallCanceled, Foreground),
145    (CancelCall, Foreground),
146    (CopyProjectEntry, Foreground),
147    (CreateBufferForPeer, Foreground),
148    (CreateChannel, Foreground),
149    (CreateChannelResponse, Foreground),
150    (ChannelMessageSent, Foreground),
151    (CreateProjectEntry, Foreground),
152    (CreateRoom, Foreground),
153    (CreateRoomResponse, Foreground),
154    (DeclineCall, Foreground),
155    (DeleteProjectEntry, Foreground),
156    (Error, Foreground),
157    (ExpandProjectEntry, Foreground),
158    (Follow, Foreground),
159    (FollowResponse, Foreground),
160    (FormatBuffers, Foreground),
161    (FormatBuffersResponse, Foreground),
162    (FuzzySearchUsers, Foreground),
163    (GetCodeActions, Background),
164    (GetCodeActionsResponse, Background),
165    (GetHover, Background),
166    (GetHoverResponse, Background),
167    (GetChannelMessages, Background),
168    (GetChannelMessagesResponse, Background),
169    (SendChannelMessage, Background),
170    (SendChannelMessageResponse, Background),
171    (GetCompletions, Background),
172    (GetCompletionsResponse, Background),
173    (GetDefinition, Background),
174    (GetDefinitionResponse, Background),
175    (GetTypeDefinition, Background),
176    (GetTypeDefinitionResponse, Background),
177    (GetDocumentHighlights, Background),
178    (GetDocumentHighlightsResponse, Background),
179    (GetReferences, Background),
180    (GetReferencesResponse, Background),
181    (GetProjectSymbols, Background),
182    (GetProjectSymbolsResponse, Background),
183    (GetUsers, Foreground),
184    (Hello, Foreground),
185    (IncomingCall, Foreground),
186    (InviteChannelMember, Foreground),
187    (UsersResponse, Foreground),
188    (JoinProject, Foreground),
189    (JoinProjectResponse, Foreground),
190    (JoinRoom, Foreground),
191    (JoinRoomResponse, Foreground),
192    (JoinChannelChat, Foreground),
193    (JoinChannelChatResponse, Foreground),
194    (LeaveChannelChat, Foreground),
195    (LeaveProject, Foreground),
196    (LeaveRoom, Foreground),
197    (OpenBufferById, Background),
198    (OpenBufferByPath, Background),
199    (OpenBufferForSymbol, Background),
200    (OpenBufferForSymbolResponse, Background),
201    (OpenBufferResponse, Background),
202    (PerformRename, Background),
203    (PerformRenameResponse, Background),
204    (OnTypeFormatting, Background),
205    (OnTypeFormattingResponse, Background),
206    (InlayHints, Background),
207    (InlayHintsResponse, Background),
208    (ResolveInlayHint, Background),
209    (ResolveInlayHintResponse, Background),
210    (RefreshInlayHints, Foreground),
211    (Ping, Foreground),
212    (PrepareRename, Background),
213    (PrepareRenameResponse, Background),
214    (ExpandProjectEntryResponse, Foreground),
215    (ProjectEntryResponse, Foreground),
216    (RejoinRoom, Foreground),
217    (RejoinRoomResponse, Foreground),
218    (RemoveContact, Foreground),
219    (RemoveChannelMember, Foreground),
220    (RemoveChannelMessage, Foreground),
221    (ReloadBuffers, Foreground),
222    (ReloadBuffersResponse, Foreground),
223    (RemoveProjectCollaborator, Foreground),
224    (RenameProjectEntry, Foreground),
225    (RequestContact, Foreground),
226    (RespondToContactRequest, Foreground),
227    (RespondToChannelInvite, Foreground),
228    (JoinChannel, Foreground),
229    (RoomUpdated, Foreground),
230    (SaveBuffer, Foreground),
231    (RenameChannel, Foreground),
232    (RenameChannelResponse, Foreground),
233    (SetChannelMemberAdmin, Foreground),
234    (SearchProject, Background),
235    (SearchProjectResponse, Background),
236    (ShareProject, Foreground),
237    (ShareProjectResponse, Foreground),
238    (ShowContacts, Foreground),
239    (StartLanguageServer, Foreground),
240    (SynchronizeBuffers, Foreground),
241    (SynchronizeBuffersResponse, Foreground),
242    (RejoinChannelBuffers, Foreground),
243    (RejoinChannelBuffersResponse, Foreground),
244    (Test, Foreground),
245    (Unfollow, Foreground),
246    (UnshareProject, Foreground),
247    (UpdateBuffer, Foreground),
248    (UpdateBufferFile, Foreground),
249    (UpdateContacts, Foreground),
250    (DeleteChannel, Foreground),
251    (MoveChannel, Foreground),
252    (LinkChannel, Foreground),
253    (UnlinkChannel, Foreground),
254    (UpdateChannels, Foreground),
255    (UpdateDiagnosticSummary, Foreground),
256    (UpdateFollowers, Foreground),
257    (UpdateInviteInfo, Foreground),
258    (UpdateLanguageServer, Foreground),
259    (UpdateParticipantLocation, Foreground),
260    (UpdateProject, Foreground),
261    (UpdateProjectCollaborator, Foreground),
262    (UpdateWorktree, Foreground),
263    (UpdateWorktreeSettings, Foreground),
264    (UpdateDiffBase, Foreground),
265    (GetPrivateUserInfo, Foreground),
266    (GetPrivateUserInfoResponse, Foreground),
267    (GetChannelMembers, Foreground),
268    (GetChannelMembersResponse, Foreground),
269    (JoinChannelBuffer, Foreground),
270    (JoinChannelBufferResponse, Foreground),
271    (LeaveChannelBuffer, Background),
272    (UpdateChannelBuffer, Foreground),
273    (UpdateChannelBufferCollaborators, Foreground),
274    (AckBufferOperation, Background),
275    (AckChannelMessage, Background),
276);
277
278request_messages!(
279    (ApplyCodeAction, ApplyCodeActionResponse),
280    (
281        ApplyCompletionAdditionalEdits,
282        ApplyCompletionAdditionalEditsResponse
283    ),
284    (Call, Ack),
285    (CancelCall, Ack),
286    (CopyProjectEntry, ProjectEntryResponse),
287    (CreateProjectEntry, ProjectEntryResponse),
288    (CreateRoom, CreateRoomResponse),
289    (CreateChannel, CreateChannelResponse),
290    (DeclineCall, Ack),
291    (DeleteProjectEntry, ProjectEntryResponse),
292    (ExpandProjectEntry, ExpandProjectEntryResponse),
293    (Follow, FollowResponse),
294    (FormatBuffers, FormatBuffersResponse),
295    (GetCodeActions, GetCodeActionsResponse),
296    (GetHover, GetHoverResponse),
297    (GetCompletions, GetCompletionsResponse),
298    (GetDefinition, GetDefinitionResponse),
299    (GetTypeDefinition, GetTypeDefinitionResponse),
300    (GetDocumentHighlights, GetDocumentHighlightsResponse),
301    (GetReferences, GetReferencesResponse),
302    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
303    (GetProjectSymbols, GetProjectSymbolsResponse),
304    (FuzzySearchUsers, UsersResponse),
305    (GetUsers, UsersResponse),
306    (InviteChannelMember, Ack),
307    (JoinProject, JoinProjectResponse),
308    (JoinRoom, JoinRoomResponse),
309    (JoinChannelChat, JoinChannelChatResponse),
310    (LeaveRoom, Ack),
311    (RejoinRoom, RejoinRoomResponse),
312    (IncomingCall, Ack),
313    (OpenBufferById, OpenBufferResponse),
314    (OpenBufferByPath, OpenBufferResponse),
315    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
316    (Ping, Ack),
317    (PerformRename, PerformRenameResponse),
318    (PrepareRename, PrepareRenameResponse),
319    (OnTypeFormatting, OnTypeFormattingResponse),
320    (InlayHints, InlayHintsResponse),
321    (ResolveInlayHint, ResolveInlayHintResponse),
322    (RefreshInlayHints, Ack),
323    (ReloadBuffers, ReloadBuffersResponse),
324    (RequestContact, Ack),
325    (RemoveChannelMember, Ack),
326    (RemoveContact, Ack),
327    (RespondToContactRequest, Ack),
328    (RespondToChannelInvite, Ack),
329    (SetChannelMemberAdmin, Ack),
330    (SendChannelMessage, SendChannelMessageResponse),
331    (GetChannelMessages, GetChannelMessagesResponse),
332    (GetChannelMembers, GetChannelMembersResponse),
333    (JoinChannel, JoinRoomResponse),
334    (RemoveChannelMessage, Ack),
335    (DeleteChannel, Ack),
336    (RenameProjectEntry, ProjectEntryResponse),
337    (RenameChannel, RenameChannelResponse),
338    (LinkChannel, Ack),
339    (UnlinkChannel, Ack),
340    (MoveChannel, Ack),
341    (SaveBuffer, BufferSaved),
342    (SearchProject, SearchProjectResponse),
343    (ShareProject, ShareProjectResponse),
344    (SynchronizeBuffers, SynchronizeBuffersResponse),
345    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
346    (Test, Test),
347    (UpdateBuffer, Ack),
348    (UpdateParticipantLocation, Ack),
349    (UpdateProject, Ack),
350    (UpdateWorktree, Ack),
351    (JoinChannelBuffer, JoinChannelBufferResponse),
352    (LeaveChannelBuffer, Ack)
353);
354
355entity_messages!(
356    project_id,
357    AddProjectCollaborator,
358    ApplyCodeAction,
359    ApplyCompletionAdditionalEdits,
360    BufferReloaded,
361    BufferSaved,
362    CopyProjectEntry,
363    CreateBufferForPeer,
364    CreateProjectEntry,
365    DeleteProjectEntry,
366    ExpandProjectEntry,
367    FormatBuffers,
368    GetCodeActions,
369    GetCompletions,
370    GetDefinition,
371    GetTypeDefinition,
372    GetDocumentHighlights,
373    GetHover,
374    GetReferences,
375    GetProjectSymbols,
376    JoinProject,
377    LeaveProject,
378    OpenBufferById,
379    OpenBufferByPath,
380    OpenBufferForSymbol,
381    PerformRename,
382    OnTypeFormatting,
383    InlayHints,
384    ResolveInlayHint,
385    RefreshInlayHints,
386    PrepareRename,
387    ReloadBuffers,
388    RemoveProjectCollaborator,
389    RenameProjectEntry,
390    SaveBuffer,
391    SearchProject,
392    StartLanguageServer,
393    SynchronizeBuffers,
394    UnshareProject,
395    UpdateBuffer,
396    UpdateBufferFile,
397    UpdateDiagnosticSummary,
398    UpdateLanguageServer,
399    UpdateProject,
400    UpdateProjectCollaborator,
401    UpdateWorktree,
402    UpdateWorktreeSettings,
403    UpdateDiffBase
404);
405
406entity_messages!(
407    channel_id,
408    ChannelMessageSent,
409    UpdateChannelBuffer,
410    RemoveChannelMessage,
411    UpdateChannelBufferCollaborators,
412);
413
414const KIB: usize = 1024;
415const MIB: usize = KIB * 1024;
416const MAX_BUFFER_LEN: usize = MIB;
417
418/// A stream of protobuf messages.
419pub struct MessageStream<S> {
420    stream: S,
421    encoding_buffer: Vec<u8>,
422}
423
424#[allow(clippy::large_enum_variant)]
425#[derive(Debug)]
426pub enum Message {
427    Envelope(Envelope),
428    Ping,
429    Pong,
430}
431
432impl<S> MessageStream<S> {
433    pub fn new(stream: S) -> Self {
434        Self {
435            stream,
436            encoding_buffer: Vec::new(),
437        }
438    }
439
440    pub fn inner_mut(&mut self) -> &mut S {
441        &mut self.stream
442    }
443}
444
445impl<S> MessageStream<S>
446where
447    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
448{
449    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
450        #[cfg(any(test, feature = "test-support"))]
451        const COMPRESSION_LEVEL: i32 = -7;
452
453        #[cfg(not(any(test, feature = "test-support")))]
454        const COMPRESSION_LEVEL: i32 = 4;
455
456        match message {
457            Message::Envelope(message) => {
458                self.encoding_buffer.reserve(message.encoded_len());
459                message
460                    .encode(&mut self.encoding_buffer)
461                    .map_err(io::Error::from)?;
462                let buffer =
463                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
464                        .unwrap();
465
466                self.encoding_buffer.clear();
467                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
468                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
469            }
470            Message::Ping => {
471                self.stream
472                    .send(WebSocketMessage::Ping(Default::default()))
473                    .await?;
474            }
475            Message::Pong => {
476                self.stream
477                    .send(WebSocketMessage::Pong(Default::default()))
478                    .await?;
479            }
480        }
481
482        Ok(())
483    }
484}
485
486impl<S> MessageStream<S>
487where
488    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
489{
490    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
491        while let Some(bytes) = self.stream.next().await {
492            match bytes? {
493                WebSocketMessage::Binary(bytes) => {
494                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
495                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
496                        .map_err(io::Error::from)?;
497
498                    self.encoding_buffer.clear();
499                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
500                    return Ok(Message::Envelope(envelope));
501                }
502                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
503                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
504                WebSocketMessage::Close(_) => break,
505                _ => {}
506            }
507        }
508        Err(anyhow!("connection closed"))
509    }
510}
511
512impl From<Timestamp> for SystemTime {
513    fn from(val: Timestamp) -> Self {
514        UNIX_EPOCH
515            .checked_add(Duration::new(val.seconds, val.nanos))
516            .unwrap()
517    }
518}
519
520impl From<SystemTime> for Timestamp {
521    fn from(time: SystemTime) -> Self {
522        let duration = time.duration_since(UNIX_EPOCH).unwrap();
523        Self {
524            seconds: duration.as_secs(),
525            nanos: duration.subsec_nanos(),
526        }
527    }
528}
529
530impl From<u128> for Nonce {
531    fn from(nonce: u128) -> Self {
532        let upper_half = (nonce >> 64) as u64;
533        let lower_half = nonce as u64;
534        Self {
535            upper_half,
536            lower_half,
537        }
538    }
539}
540
541impl From<Nonce> for u128 {
542    fn from(nonce: Nonce) -> Self {
543        let upper_half = (nonce.upper_half as u128) << 64;
544        let lower_half = nonce.lower_half as u128;
545        upper_half | lower_half
546    }
547}
548
549pub fn split_worktree_update(
550    mut message: UpdateWorktree,
551    max_chunk_size: usize,
552) -> impl Iterator<Item = UpdateWorktree> {
553    let mut done_files = false;
554
555    let mut repository_map = message
556        .updated_repositories
557        .into_iter()
558        .map(|repo| (repo.work_directory_id, repo))
559        .collect::<HashMap<_, _>>();
560
561    iter::from_fn(move || {
562        if done_files {
563            return None;
564        }
565
566        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
567        let updated_entries: Vec<_> = message
568            .updated_entries
569            .drain(..updated_entries_chunk_size)
570            .collect();
571
572        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
573        let removed_entries = message
574            .removed_entries
575            .drain(..removed_entries_chunk_size)
576            .collect();
577
578        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
579
580        let mut updated_repositories = Vec::new();
581
582        if !repository_map.is_empty() {
583            for entry in &updated_entries {
584                if let Some(repo) = repository_map.remove(&entry.id) {
585                    updated_repositories.push(repo)
586                }
587            }
588        }
589
590        let removed_repositories = if done_files {
591            mem::take(&mut message.removed_repositories)
592        } else {
593            Default::default()
594        };
595
596        if done_files {
597            updated_repositories.extend(mem::take(&mut repository_map).into_values());
598        }
599
600        Some(UpdateWorktree {
601            project_id: message.project_id,
602            worktree_id: message.worktree_id,
603            root_name: message.root_name.clone(),
604            abs_path: message.abs_path.clone(),
605            updated_entries,
606            removed_entries,
607            scan_id: message.scan_id,
608            is_last_update: done_files && message.is_last_update,
609            updated_repositories,
610            removed_repositories,
611        })
612    })
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618
619    #[gpui2::test]
620    async fn test_buffer_size() {
621        let (tx, rx) = futures::channel::mpsc::unbounded();
622        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
623        sink.write(Message::Envelope(Envelope {
624            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
625                root_name: "abcdefg".repeat(10),
626                ..Default::default()
627            })),
628            ..Default::default()
629        }))
630        .await
631        .unwrap();
632        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
633        sink.write(Message::Envelope(Envelope {
634            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
635                root_name: "abcdefg".repeat(1000000),
636                ..Default::default()
637            })),
638            ..Default::default()
639        }))
640        .await
641        .unwrap();
642        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
643
644        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
645        stream.read().await.unwrap();
646        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
647        stream.read().await.unwrap();
648        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
649    }
650
651    #[gpui2::test]
652    fn test_converting_peer_id_from_and_to_u64() {
653        let peer_id = PeerId {
654            owner_id: 10,
655            id: 3,
656        };
657        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
658        let peer_id = PeerId {
659            owner_id: u32::MAX,
660            id: 3,
661        };
662        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
663        let peer_id = PeerId {
664            owner_id: 10,
665            id: u32::MAX,
666        };
667        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
668        let peer_id = PeerId {
669            owner_id: u32::MAX,
670            id: u32::MAX,
671        };
672        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
673    }
674}