proto.rs

  1use super::{ConnectionId, PeerId, TypedEnvelope};
  2use anyhow::Result;
  3use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
  4use futures::{SinkExt as _, StreamExt as _};
  5use prost::Message;
  6use std::any::{Any, TypeId};
  7use std::{
  8    io,
  9    time::{Duration, SystemTime, UNIX_EPOCH},
 10};
 11
 12include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 13
 14pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
 15    const NAME: &'static str;
 16    fn into_envelope(
 17        self,
 18        id: u32,
 19        responding_to: Option<u32>,
 20        original_sender_id: Option<u32>,
 21    ) -> Envelope;
 22    fn from_envelope(envelope: Envelope) -> Option<Self>;
 23}
 24
 25pub trait EntityMessage: EnvelopedMessage {
 26    fn remote_entity_id(&self) -> u64;
 27}
 28
 29pub trait RequestMessage: EnvelopedMessage {
 30    type Response: EnvelopedMessage;
 31}
 32
 33pub trait AnyTypedEnvelope: 'static + Send + Sync {
 34    fn payload_type_id(&self) -> TypeId;
 35    fn payload_type_name(&self) -> &'static str;
 36    fn as_any(&self) -> &dyn Any;
 37    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
 38}
 39
 40impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
 41    fn payload_type_id(&self) -> TypeId {
 42        TypeId::of::<T>()
 43    }
 44
 45    fn payload_type_name(&self) -> &'static str {
 46        T::NAME
 47    }
 48
 49    fn as_any(&self) -> &dyn Any {
 50        self
 51    }
 52
 53    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
 54        self
 55    }
 56}
 57
 58macro_rules! messages {
 59    ($($name:ident),* $(,)?) => {
 60        pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
 61            match envelope.payload {
 62                $(Some(envelope::Payload::$name(payload)) => {
 63                    Some(Box::new(TypedEnvelope {
 64                        sender_id,
 65                        original_sender_id: envelope.original_sender_id.map(PeerId),
 66                        message_id: envelope.id,
 67                        payload,
 68                    }))
 69                }, )*
 70                _ => None
 71            }
 72        }
 73
 74        $(
 75            impl EnvelopedMessage for $name {
 76                const NAME: &'static str = std::stringify!($name);
 77
 78                fn into_envelope(
 79                    self,
 80                    id: u32,
 81                    responding_to: Option<u32>,
 82                    original_sender_id: Option<u32>,
 83                ) -> Envelope {
 84                    Envelope {
 85                        id,
 86                        responding_to,
 87                        original_sender_id,
 88                        payload: Some(envelope::Payload::$name(self)),
 89                    }
 90                }
 91
 92                fn from_envelope(envelope: Envelope) -> Option<Self> {
 93                    if let Some(envelope::Payload::$name(msg)) = envelope.payload {
 94                        Some(msg)
 95                    } else {
 96                        None
 97                    }
 98                }
 99            }
100        )*
101    };
102}
103
104macro_rules! request_messages {
105    ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
106        $(impl RequestMessage for $request_name {
107            type Response = $response_name;
108        })*
109    };
110}
111
112macro_rules! entity_messages {
113    ($id_field:ident, $($name:ident),* $(,)?) => {
114        $(impl EntityMessage for $name {
115            fn remote_entity_id(&self) -> u64 {
116                self.$id_field
117            }
118        })*
119    };
120}
121
122messages!(
123    Ack,
124    AddPeer,
125    BufferSaved,
126    ChannelMessageSent,
127    CloseBuffer,
128    CloseWorktree,
129    Error,
130    GetChannelMessages,
131    GetChannelMessagesResponse,
132    GetChannels,
133    GetChannelsResponse,
134    UpdateCollaborators,
135    GetUsers,
136    GetUsersResponse,
137    JoinChannel,
138    JoinChannelResponse,
139    JoinWorktree,
140    JoinWorktreeResponse,
141    LeaveChannel,
142    OpenBuffer,
143    OpenBufferResponse,
144    OpenWorktree,
145    OpenWorktreeResponse,
146    Ping,
147    RemovePeer,
148    SaveBuffer,
149    SendChannelMessage,
150    SendChannelMessageResponse,
151    ShareWorktree,
152    ShareWorktreeResponse,
153    UnshareWorktree,
154    UpdateBuffer,
155    UpdateWorktree,
156);
157
158request_messages!(
159    (GetChannels, GetChannelsResponse),
160    (GetUsers, GetUsersResponse),
161    (JoinChannel, JoinChannelResponse),
162    (OpenBuffer, OpenBufferResponse),
163    (JoinWorktree, JoinWorktreeResponse),
164    (OpenWorktree, OpenWorktreeResponse),
165    (Ping, Ack),
166    (SaveBuffer, BufferSaved),
167    (UpdateBuffer, Ack),
168    (ShareWorktree, ShareWorktreeResponse),
169    (UnshareWorktree, Ack),
170    (SendChannelMessage, SendChannelMessageResponse),
171    (GetChannelMessages, GetChannelMessagesResponse),
172);
173
174entity_messages!(
175    worktree_id,
176    AddPeer,
177    BufferSaved,
178    CloseBuffer,
179    CloseWorktree,
180    OpenBuffer,
181    JoinWorktree,
182    RemovePeer,
183    SaveBuffer,
184    UnshareWorktree,
185    UpdateBuffer,
186    UpdateWorktree,
187);
188
189entity_messages!(channel_id, ChannelMessageSent);
190
191/// A stream of protobuf messages.
192pub struct MessageStream<S> {
193    stream: S,
194}
195
196impl<S> MessageStream<S> {
197    pub fn new(stream: S) -> Self {
198        Self { stream }
199    }
200
201    pub fn inner_mut(&mut self) -> &mut S {
202        &mut self.stream
203    }
204}
205
206impl<S> MessageStream<S>
207where
208    S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
209{
210    /// Write a given protobuf message to the stream.
211    pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
212        let mut buffer = Vec::with_capacity(message.encoded_len());
213        message
214            .encode(&mut buffer)
215            .map_err(|err| io::Error::from(err))?;
216        self.stream.send(WebSocketMessage::Binary(buffer)).await?;
217        Ok(())
218    }
219}
220
221impl<S> MessageStream<S>
222where
223    S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
224{
225    /// Read a protobuf message of the given type from the stream.
226    pub async fn read_message(&mut self) -> Result<Envelope, WebSocketError> {
227        while let Some(bytes) = self.stream.next().await {
228            match bytes? {
229                WebSocketMessage::Binary(bytes) => {
230                    let envelope = Envelope::decode(bytes.as_slice()).map_err(io::Error::from)?;
231                    return Ok(envelope);
232                }
233                WebSocketMessage::Close(_) => break,
234                _ => {}
235            }
236        }
237        Err(WebSocketError::ConnectionClosed)
238    }
239}
240
241impl Into<SystemTime> for Timestamp {
242    fn into(self) -> SystemTime {
243        UNIX_EPOCH
244            .checked_add(Duration::new(self.seconds, self.nanos))
245            .unwrap()
246    }
247}
248
249impl From<SystemTime> for Timestamp {
250    fn from(time: SystemTime) -> Self {
251        let duration = time.duration_since(UNIX_EPOCH).unwrap();
252        Self {
253            seconds: duration.as_secs(),
254            nanos: duration.subsec_nanos(),
255        }
256    }
257}
258
259impl From<u128> for Nonce {
260    fn from(nonce: u128) -> Self {
261        let upper_half = (nonce >> 64) as u64;
262        let lower_half = nonce as u64;
263        Self {
264            upper_half,
265            lower_half,
266        }
267    }
268}
269
270impl From<Nonce> for u128 {
271    fn from(nonce: Nonce) -> Self {
272        let upper_half = (nonce.upper_half as u128) << 64;
273        let lower_half = nonce.lower_half as u128;
274        upper_half | lower_half
275    }
276}