proto.rs

  1#![allow(non_snake_case)]
  2
  3use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
  4use anyhow::{anyhow, Result};
  5use async_tungstenite::tungstenite::Message as WebSocketMessage;
  6use collections::HashMap;
  7use futures::{SinkExt as _, StreamExt as _};
  8use prost::Message as _;
  9use serde::Serialize;
 10use std::any::{Any, TypeId};
 11use std::{
 12    cmp,
 13    fmt::Debug,
 14    io, iter,
 15    time::{Duration, SystemTime, UNIX_EPOCH},
 16};
 17use std::{fmt, mem};
 18
 19include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 20
 21pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
 22    const NAME: &'static str;
 23    const PRIORITY: MessagePriority;
 24    fn into_envelope(
 25        self,
 26        id: u32,
 27        responding_to: Option<u32>,
 28        original_sender_id: Option<PeerId>,
 29    ) -> Envelope;
 30    fn from_envelope(envelope: Envelope) -> Option<Self>;
 31}
 32
 33pub trait EntityMessage: EnvelopedMessage {
 34    fn remote_entity_id(&self) -> u64;
 35}
 36
 37pub trait RequestMessage: EnvelopedMessage {
 38    type Response: EnvelopedMessage;
 39}
 40
 41pub trait AnyTypedEnvelope: 'static + Send + Sync {
 42    fn payload_type_id(&self) -> TypeId;
 43    fn payload_type_name(&self) -> &'static str;
 44    fn as_any(&self) -> &dyn Any;
 45    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 46    fn is_background(&self) -> bool;
 47    fn original_sender_id(&self) -> Option<PeerId>;
 48    fn sender_id(&self) -> ConnectionId;
 49    fn message_id(&self) -> u32;
 50}
 51
 52pub enum MessagePriority {
 53    Foreground,
 54    Background,
 55}
 56
 57impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 58    fn payload_type_id(&self) -> TypeId {
 59        TypeId::of::<T>()
 60    }
 61
 62    fn payload_type_name(&self) -> &'static str {
 63        T::NAME
 64    }
 65
 66    fn as_any(&self) -> &dyn Any {
 67        self
 68    }
 69
 70    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 71        self
 72    }
 73
 74    fn is_background(&self) -> bool {
 75        matches!(T::PRIORITY, MessagePriority::Background)
 76    }
 77
 78    fn original_sender_id(&self) -> Option<PeerId> {
 79        self.original_sender_id
 80    }
 81
 82    fn sender_id(&self) -> ConnectionId {
 83        self.sender_id
 84    }
 85
 86    fn message_id(&self) -> u32 {
 87        self.message_id
 88    }
 89}
 90
 91impl PeerId {
 92    pub fn from_u64(peer_id: u64) -> Self {
 93        let owner_id = (peer_id >> 32) as u32;
 94        let id = peer_id as u32;
 95        Self { owner_id, id }
 96    }
 97
 98    pub fn as_u64(self) -> u64 {
 99        ((self.owner_id as u64) << 32) | (self.id as u64)
100    }
101}
102
103impl Copy for PeerId {}
104
105impl Eq for PeerId {}
106
107impl Ord for PeerId {
108    fn cmp(&self, other: &Self) -> cmp::Ordering {
109        self.owner_id
110            .cmp(&other.owner_id)
111            .then_with(|| self.id.cmp(&other.id))
112    }
113}
114
115impl PartialOrd for PeerId {
116    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
117        Some(self.cmp(other))
118    }
119}
120
121impl std::hash::Hash for PeerId {
122    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
123        self.owner_id.hash(state);
124        self.id.hash(state);
125    }
126}
127
128impl fmt::Display for PeerId {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        write!(f, "{}/{}", self.owner_id, self.id)
131    }
132}
133
134messages!(
135    (Ack, Foreground),
136    (AddProjectCollaborator, Foreground),
137    (ApplyCodeAction, Background),
138    (ApplyCodeActionResponse, Background),
139    (ApplyCompletionAdditionalEdits, Background),
140    (ApplyCompletionAdditionalEditsResponse, Background),
141    (BufferReloaded, Foreground),
142    (BufferSaved, Foreground),
143    (Call, Foreground),
144    (CallCanceled, Foreground),
145    (CancelCall, Foreground),
146    (CopyProjectEntry, Foreground),
147    (CreateBufferForPeer, Foreground),
148    (CreateChannel, Foreground),
149    (ChannelResponse, Foreground),
150    (CreateProjectEntry, Foreground),
151    (CreateRoom, Foreground),
152    (CreateRoomResponse, Foreground),
153    (DeclineCall, Foreground),
154    (DeleteProjectEntry, Foreground),
155    (Error, Foreground),
156    (ExpandProjectEntry, Foreground),
157    (Follow, Foreground),
158    (FollowResponse, Foreground),
159    (FormatBuffers, Foreground),
160    (FormatBuffersResponse, Foreground),
161    (FuzzySearchUsers, Foreground),
162    (GetCodeActions, Background),
163    (GetCodeActionsResponse, Background),
164    (GetHover, Background),
165    (GetHoverResponse, Background),
166    (GetCompletions, Background),
167    (GetCompletionsResponse, Background),
168    (GetDefinition, Background),
169    (GetDefinitionResponse, Background),
170    (GetTypeDefinition, Background),
171    (GetTypeDefinitionResponse, Background),
172    (GetDocumentHighlights, Background),
173    (GetDocumentHighlightsResponse, Background),
174    (GetReferences, Background),
175    (GetReferencesResponse, Background),
176    (GetProjectSymbols, Background),
177    (GetProjectSymbolsResponse, Background),
178    (GetUsers, Foreground),
179    (Hello, Foreground),
180    (IncomingCall, Foreground),
181    (InviteChannelMember, Foreground),
182    (UsersResponse, Foreground),
183    (JoinProject, Foreground),
184    (JoinProjectResponse, Foreground),
185    (JoinRoom, Foreground),
186    (JoinRoomResponse, Foreground),
187    (LeaveProject, Foreground),
188    (LeaveRoom, Foreground),
189    (OpenBufferById, Background),
190    (OpenBufferByPath, Background),
191    (OpenBufferForSymbol, Background),
192    (OpenBufferForSymbolResponse, Background),
193    (OpenBufferResponse, Background),
194    (PerformRename, Background),
195    (PerformRenameResponse, Background),
196    (OnTypeFormatting, Background),
197    (OnTypeFormattingResponse, Background),
198    (InlayHints, Background),
199    (InlayHintsResponse, Background),
200    (ResolveInlayHint, Background),
201    (ResolveInlayHintResponse, Background),
202    (RefreshInlayHints, Foreground),
203    (Ping, Foreground),
204    (PrepareRename, Background),
205    (PrepareRenameResponse, Background),
206    (ExpandProjectEntryResponse, Foreground),
207    (ProjectEntryResponse, Foreground),
208    (RejoinRoom, Foreground),
209    (RejoinRoomResponse, Foreground),
210    (RemoveContact, Foreground),
211    (RemoveChannelMember, Foreground),
212    (ReloadBuffers, Foreground),
213    (ReloadBuffersResponse, Foreground),
214    (RemoveProjectCollaborator, Foreground),
215    (RenameProjectEntry, Foreground),
216    (RequestContact, Foreground),
217    (RespondToContactRequest, Foreground),
218    (RespondToChannelInvite, Foreground),
219    (JoinChannel, Foreground),
220    (RoomUpdated, Foreground),
221    (SaveBuffer, Foreground),
222    (RenameChannel, Foreground),
223    (SetChannelMemberAdmin, Foreground),
224    (SearchProject, Background),
225    (SearchProjectResponse, Background),
226    (ShareProject, Foreground),
227    (ShareProjectResponse, Foreground),
228    (ShowContacts, Foreground),
229    (StartLanguageServer, Foreground),
230    (SynchronizeBuffers, Foreground),
231    (SynchronizeBuffersResponse, Foreground),
232    (RejoinChannelBuffers, Foreground),
233    (RejoinChannelBuffersResponse, Foreground),
234    (Test, Foreground),
235    (Unfollow, Foreground),
236    (UnshareProject, Foreground),
237    (UpdateBuffer, Foreground),
238    (UpdateBufferFile, Foreground),
239    (UpdateContacts, Foreground),
240    (RemoveChannel, Foreground),
241    (UpdateChannels, Foreground),
242    (UpdateDiagnosticSummary, Foreground),
243    (UpdateFollowers, Foreground),
244    (UpdateInviteInfo, Foreground),
245    (UpdateLanguageServer, Foreground),
246    (UpdateParticipantLocation, Foreground),
247    (UpdateProject, Foreground),
248    (UpdateProjectCollaborator, Foreground),
249    (UpdateWorktree, Foreground),
250    (UpdateWorktreeSettings, Foreground),
251    (UpdateDiffBase, Foreground),
252    (GetPrivateUserInfo, Foreground),
253    (GetPrivateUserInfoResponse, Foreground),
254    (GetChannelMembers, Foreground),
255    (GetChannelMembersResponse, Foreground),
256    (JoinChannelBuffer, Foreground),
257    (JoinChannelBufferResponse, Foreground),
258    (LeaveChannelBuffer, Background),
259    (UpdateChannelBuffer, Foreground),
260    (RemoveChannelBufferCollaborator, Foreground),
261    (AddChannelBufferCollaborator, Foreground),
262    (UpdateChannelBufferCollaborator, Foreground),
263);
264
265request_messages!(
266    (ApplyCodeAction, ApplyCodeActionResponse),
267    (
268        ApplyCompletionAdditionalEdits,
269        ApplyCompletionAdditionalEditsResponse
270    ),
271    (Call, Ack),
272    (CancelCall, Ack),
273    (CopyProjectEntry, ProjectEntryResponse),
274    (CreateProjectEntry, ProjectEntryResponse),
275    (CreateRoom, CreateRoomResponse),
276    (CreateChannel, ChannelResponse),
277    (DeclineCall, Ack),
278    (DeleteProjectEntry, ProjectEntryResponse),
279    (ExpandProjectEntry, ExpandProjectEntryResponse),
280    (Follow, FollowResponse),
281    (FormatBuffers, FormatBuffersResponse),
282    (GetCodeActions, GetCodeActionsResponse),
283    (GetHover, GetHoverResponse),
284    (GetCompletions, GetCompletionsResponse),
285    (GetDefinition, GetDefinitionResponse),
286    (GetTypeDefinition, GetTypeDefinitionResponse),
287    (GetDocumentHighlights, GetDocumentHighlightsResponse),
288    (GetReferences, GetReferencesResponse),
289    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
290    (GetProjectSymbols, GetProjectSymbolsResponse),
291    (FuzzySearchUsers, UsersResponse),
292    (GetUsers, UsersResponse),
293    (InviteChannelMember, Ack),
294    (JoinProject, JoinProjectResponse),
295    (JoinRoom, JoinRoomResponse),
296    (LeaveRoom, Ack),
297    (RejoinRoom, RejoinRoomResponse),
298    (IncomingCall, Ack),
299    (OpenBufferById, OpenBufferResponse),
300    (OpenBufferByPath, OpenBufferResponse),
301    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
302    (Ping, Ack),
303    (PerformRename, PerformRenameResponse),
304    (PrepareRename, PrepareRenameResponse),
305    (OnTypeFormatting, OnTypeFormattingResponse),
306    (InlayHints, InlayHintsResponse),
307    (ResolveInlayHint, ResolveInlayHintResponse),
308    (RefreshInlayHints, Ack),
309    (ReloadBuffers, ReloadBuffersResponse),
310    (RequestContact, Ack),
311    (RemoveChannelMember, Ack),
312    (RemoveContact, Ack),
313    (RespondToContactRequest, Ack),
314    (RespondToChannelInvite, Ack),
315    (SetChannelMemberAdmin, Ack),
316    (GetChannelMembers, GetChannelMembersResponse),
317    (JoinChannel, JoinRoomResponse),
318    (RemoveChannel, Ack),
319    (RenameProjectEntry, ProjectEntryResponse),
320    (RenameChannel, ChannelResponse),
321    (SaveBuffer, BufferSaved),
322    (SearchProject, SearchProjectResponse),
323    (ShareProject, ShareProjectResponse),
324    (SynchronizeBuffers, SynchronizeBuffersResponse),
325    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
326    (Test, Test),
327    (UpdateBuffer, Ack),
328    (UpdateParticipantLocation, Ack),
329    (UpdateProject, Ack),
330    (UpdateWorktree, Ack),
331    (JoinChannelBuffer, JoinChannelBufferResponse),
332    (LeaveChannelBuffer, Ack)
333);
334
335entity_messages!(
336    project_id,
337    AddProjectCollaborator,
338    ApplyCodeAction,
339    ApplyCompletionAdditionalEdits,
340    BufferReloaded,
341    BufferSaved,
342    CopyProjectEntry,
343    CreateBufferForPeer,
344    CreateProjectEntry,
345    DeleteProjectEntry,
346    ExpandProjectEntry,
347    Follow,
348    FormatBuffers,
349    GetCodeActions,
350    GetCompletions,
351    GetDefinition,
352    GetTypeDefinition,
353    GetDocumentHighlights,
354    GetHover,
355    GetReferences,
356    GetProjectSymbols,
357    JoinProject,
358    LeaveProject,
359    OpenBufferById,
360    OpenBufferByPath,
361    OpenBufferForSymbol,
362    PerformRename,
363    OnTypeFormatting,
364    InlayHints,
365    ResolveInlayHint,
366    RefreshInlayHints,
367    PrepareRename,
368    ReloadBuffers,
369    RemoveProjectCollaborator,
370    RenameProjectEntry,
371    SaveBuffer,
372    SearchProject,
373    StartLanguageServer,
374    SynchronizeBuffers,
375    Unfollow,
376    UnshareProject,
377    UpdateBuffer,
378    UpdateBufferFile,
379    UpdateDiagnosticSummary,
380    UpdateFollowers,
381    UpdateLanguageServer,
382    UpdateProject,
383    UpdateProjectCollaborator,
384    UpdateWorktree,
385    UpdateWorktreeSettings,
386    UpdateDiffBase
387);
388
389entity_messages!(
390    channel_id,
391    UpdateChannelBuffer,
392    RemoveChannelBufferCollaborator,
393    AddChannelBufferCollaborator,
394    UpdateChannelBufferCollaborator
395);
396
397const KIB: usize = 1024;
398const MIB: usize = KIB * 1024;
399const MAX_BUFFER_LEN: usize = MIB;
400
401/// A stream of protobuf messages.
402pub struct MessageStream<S> {
403    stream: S,
404    encoding_buffer: Vec<u8>,
405}
406
407#[allow(clippy::large_enum_variant)]
408#[derive(Debug)]
409pub enum Message {
410    Envelope(Envelope),
411    Ping,
412    Pong,
413}
414
415impl<S> MessageStream<S> {
416    pub fn new(stream: S) -> Self {
417        Self {
418            stream,
419            encoding_buffer: Vec::new(),
420        }
421    }
422
423    pub fn inner_mut(&mut self) -> &mut S {
424        &mut self.stream
425    }
426}
427
428impl<S> MessageStream<S>
429where
430    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
431{
432    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
433        #[cfg(any(test, feature = "test-support"))]
434        const COMPRESSION_LEVEL: i32 = -7;
435
436        #[cfg(not(any(test, feature = "test-support")))]
437        const COMPRESSION_LEVEL: i32 = 4;
438
439        match message {
440            Message::Envelope(message) => {
441                self.encoding_buffer.reserve(message.encoded_len());
442                message
443                    .encode(&mut self.encoding_buffer)
444                    .map_err(io::Error::from)?;
445                let buffer =
446                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
447                        .unwrap();
448
449                self.encoding_buffer.clear();
450                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
451                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
452            }
453            Message::Ping => {
454                self.stream
455                    .send(WebSocketMessage::Ping(Default::default()))
456                    .await?;
457            }
458            Message::Pong => {
459                self.stream
460                    .send(WebSocketMessage::Pong(Default::default()))
461                    .await?;
462            }
463        }
464
465        Ok(())
466    }
467}
468
469impl<S> MessageStream<S>
470where
471    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
472{
473    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
474        while let Some(bytes) = self.stream.next().await {
475            match bytes? {
476                WebSocketMessage::Binary(bytes) => {
477                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
478                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
479                        .map_err(io::Error::from)?;
480
481                    self.encoding_buffer.clear();
482                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
483                    return Ok(Message::Envelope(envelope));
484                }
485                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
486                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
487                WebSocketMessage::Close(_) => break,
488                _ => {}
489            }
490        }
491        Err(anyhow!("connection closed"))
492    }
493}
494
495impl From<Timestamp> for SystemTime {
496    fn from(val: Timestamp) -> Self {
497        UNIX_EPOCH
498            .checked_add(Duration::new(val.seconds, val.nanos))
499            .unwrap()
500    }
501}
502
503impl From<SystemTime> for Timestamp {
504    fn from(time: SystemTime) -> Self {
505        let duration = time.duration_since(UNIX_EPOCH).unwrap();
506        Self {
507            seconds: duration.as_secs(),
508            nanos: duration.subsec_nanos(),
509        }
510    }
511}
512
513impl From<u128> for Nonce {
514    fn from(nonce: u128) -> Self {
515        let upper_half = (nonce >> 64) as u64;
516        let lower_half = nonce as u64;
517        Self {
518            upper_half,
519            lower_half,
520        }
521    }
522}
523
524impl From<Nonce> for u128 {
525    fn from(nonce: Nonce) -> Self {
526        let upper_half = (nonce.upper_half as u128) << 64;
527        let lower_half = nonce.lower_half as u128;
528        upper_half | lower_half
529    }
530}
531
532pub fn split_worktree_update(
533    mut message: UpdateWorktree,
534    max_chunk_size: usize,
535) -> impl Iterator<Item = UpdateWorktree> {
536    let mut done_files = false;
537
538    let mut repository_map = message
539        .updated_repositories
540        .into_iter()
541        .map(|repo| (repo.work_directory_id, repo))
542        .collect::<HashMap<_, _>>();
543
544    iter::from_fn(move || {
545        if done_files {
546            return None;
547        }
548
549        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
550        let updated_entries: Vec<_> = message
551            .updated_entries
552            .drain(..updated_entries_chunk_size)
553            .collect();
554
555        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
556        let removed_entries = message
557            .removed_entries
558            .drain(..removed_entries_chunk_size)
559            .collect();
560
561        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
562
563        let mut updated_repositories = Vec::new();
564
565        if !repository_map.is_empty() {
566            for entry in &updated_entries {
567                if let Some(repo) = repository_map.remove(&entry.id) {
568                    updated_repositories.push(repo)
569                }
570            }
571        }
572
573        let removed_repositories = if done_files {
574            mem::take(&mut message.removed_repositories)
575        } else {
576            Default::default()
577        };
578
579        if done_files {
580            updated_repositories.extend(mem::take(&mut repository_map).into_values());
581        }
582
583        Some(UpdateWorktree {
584            project_id: message.project_id,
585            worktree_id: message.worktree_id,
586            root_name: message.root_name.clone(),
587            abs_path: message.abs_path.clone(),
588            updated_entries,
589            removed_entries,
590            scan_id: message.scan_id,
591            is_last_update: done_files && message.is_last_update,
592            updated_repositories,
593            removed_repositories,
594        })
595    })
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    #[gpui::test]
603    async fn test_buffer_size() {
604        let (tx, rx) = futures::channel::mpsc::unbounded();
605        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
606        sink.write(Message::Envelope(Envelope {
607            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
608                root_name: "abcdefg".repeat(10),
609                ..Default::default()
610            })),
611            ..Default::default()
612        }))
613        .await
614        .unwrap();
615        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
616        sink.write(Message::Envelope(Envelope {
617            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
618                root_name: "abcdefg".repeat(1000000),
619                ..Default::default()
620            })),
621            ..Default::default()
622        }))
623        .await
624        .unwrap();
625        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
626
627        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
628        stream.read().await.unwrap();
629        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
630        stream.read().await.unwrap();
631        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
632    }
633
634    #[gpui::test]
635    fn test_converting_peer_id_from_and_to_u64() {
636        let peer_id = PeerId {
637            owner_id: 10,
638            id: 3,
639        };
640        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
641        let peer_id = PeerId {
642            owner_id: u32::MAX,
643            id: 3,
644        };
645        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
646        let peer_id = PeerId {
647            owner_id: 10,
648            id: u32::MAX,
649        };
650        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
651        let peer_id = PeerId {
652            owner_id: u32::MAX,
653            id: u32::MAX,
654        };
655        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
656    }
657}