1use super::{ConnectionId, PeerId, TypedEnvelope};
2use anyhow::Result;
3use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message;
6use std::any::{Any, TypeId};
7use std::{
8 io,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
13
14pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
15 const NAME: &'static str;
16 fn into_envelope(
17 self,
18 id: u32,
19 responding_to: Option<u32>,
20 original_sender_id: Option<u32>,
21 ) -> Envelope;
22 fn from_envelope(envelope: Envelope) -> Option<Self>;
23}
24
25pub trait EntityMessage: EnvelopedMessage {
26 fn remote_entity_id(&self) -> u64;
27}
28
29pub trait RequestMessage: EnvelopedMessage {
30 type Response: EnvelopedMessage;
31}
32
33pub trait AnyTypedEnvelope: 'static + Send + Sync {
34 fn payload_type_id(&self) -> TypeId;
35 fn payload_type_name(&self) -> &'static str;
36 fn as_any(&self) -> &dyn Any;
37 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
38}
39
40impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
41 fn payload_type_id(&self) -> TypeId {
42 TypeId::of::<T>()
43 }
44
45 fn payload_type_name(&self) -> &'static str {
46 T::NAME
47 }
48
49 fn as_any(&self) -> &dyn Any {
50 self
51 }
52
53 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
54 self
55 }
56}
57
58macro_rules! messages {
59 ($($name:ident),* $(,)?) => {
60 pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
61 match envelope.payload {
62 $(Some(envelope::Payload::$name(payload)) => {
63 Some(Box::new(TypedEnvelope {
64 sender_id,
65 original_sender_id: envelope.original_sender_id.map(PeerId),
66 message_id: envelope.id,
67 payload,
68 }))
69 }, )*
70 _ => None
71 }
72 }
73
74 $(
75 impl EnvelopedMessage for $name {
76 const NAME: &'static str = std::stringify!($name);
77
78 fn into_envelope(
79 self,
80 id: u32,
81 responding_to: Option<u32>,
82 original_sender_id: Option<u32>,
83 ) -> Envelope {
84 Envelope {
85 id,
86 responding_to,
87 original_sender_id,
88 payload: Some(envelope::Payload::$name(self)),
89 }
90 }
91
92 fn from_envelope(envelope: Envelope) -> Option<Self> {
93 if let Some(envelope::Payload::$name(msg)) = envelope.payload {
94 Some(msg)
95 } else {
96 None
97 }
98 }
99 }
100 )*
101 };
102}
103
104macro_rules! request_messages {
105 ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
106 $(impl RequestMessage for $request_name {
107 type Response = $response_name;
108 })*
109 };
110}
111
112macro_rules! entity_messages {
113 ($id_field:ident, $($name:ident),* $(,)?) => {
114 $(impl EntityMessage for $name {
115 fn remote_entity_id(&self) -> u64 {
116 self.$id_field
117 }
118 })*
119 };
120}
121
122messages!(
123 Ack,
124 AddPeer,
125 BufferSaved,
126 ChannelMessageSent,
127 CloseBuffer,
128 CloseWorktree,
129 Error,
130 GetChannelMessages,
131 GetChannelMessagesResponse,
132 GetChannels,
133 GetChannelsResponse,
134 UpdateCollaborators,
135 GetUsers,
136 GetUsersResponse,
137 JoinChannel,
138 JoinChannelResponse,
139 JoinWorktree,
140 JoinWorktreeResponse,
141 LeaveChannel,
142 OpenBuffer,
143 OpenBufferResponse,
144 OpenWorktree,
145 OpenWorktreeResponse,
146 Ping,
147 RemovePeer,
148 SaveBuffer,
149 SendChannelMessage,
150 SendChannelMessageResponse,
151 ShareWorktree,
152 ShareWorktreeResponse,
153 UnshareWorktree,
154 UpdateBuffer,
155 UpdateWorktree,
156);
157
158request_messages!(
159 (GetChannels, GetChannelsResponse),
160 (GetUsers, GetUsersResponse),
161 (JoinChannel, JoinChannelResponse),
162 (OpenBuffer, OpenBufferResponse),
163 (JoinWorktree, JoinWorktreeResponse),
164 (OpenWorktree, OpenWorktreeResponse),
165 (Ping, Ack),
166 (SaveBuffer, BufferSaved),
167 (UpdateBuffer, Ack),
168 (ShareWorktree, ShareWorktreeResponse),
169 (UnshareWorktree, Ack),
170 (SendChannelMessage, SendChannelMessageResponse),
171 (GetChannelMessages, GetChannelMessagesResponse),
172);
173
174entity_messages!(
175 worktree_id,
176 AddPeer,
177 BufferSaved,
178 CloseBuffer,
179 CloseWorktree,
180 OpenBuffer,
181 JoinWorktree,
182 RemovePeer,
183 SaveBuffer,
184 UnshareWorktree,
185 UpdateBuffer,
186 UpdateWorktree,
187);
188
189entity_messages!(channel_id, ChannelMessageSent);
190
191/// A stream of protobuf messages.
192pub struct MessageStream<S> {
193 stream: S,
194}
195
196impl<S> MessageStream<S> {
197 pub fn new(stream: S) -> Self {
198 Self { stream }
199 }
200
201 pub fn inner_mut(&mut self) -> &mut S {
202 &mut self.stream
203 }
204}
205
206impl<S> MessageStream<S>
207where
208 S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
209{
210 /// Write a given protobuf message to the stream.
211 pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
212 let mut buffer = Vec::with_capacity(message.encoded_len());
213 message
214 .encode(&mut buffer)
215 .map_err(|err| io::Error::from(err))?;
216 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
217 Ok(())
218 }
219}
220
221impl<S> MessageStream<S>
222where
223 S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
224{
225 /// Read a protobuf message of the given type from the stream.
226 pub async fn read_message(&mut self) -> Result<Envelope, WebSocketError> {
227 while let Some(bytes) = self.stream.next().await {
228 match bytes? {
229 WebSocketMessage::Binary(bytes) => {
230 let envelope = Envelope::decode(bytes.as_slice()).map_err(io::Error::from)?;
231 return Ok(envelope);
232 }
233 WebSocketMessage::Close(_) => break,
234 _ => {}
235 }
236 }
237 Err(WebSocketError::ConnectionClosed)
238 }
239}
240
241impl Into<SystemTime> for Timestamp {
242 fn into(self) -> SystemTime {
243 UNIX_EPOCH
244 .checked_add(Duration::new(self.seconds, self.nanos))
245 .unwrap()
246 }
247}
248
249impl From<SystemTime> for Timestamp {
250 fn from(time: SystemTime) -> Self {
251 let duration = time.duration_since(UNIX_EPOCH).unwrap();
252 Self {
253 seconds: duration.as_secs(),
254 nanos: duration.subsec_nanos(),
255 }
256 }
257}
258
259impl From<u128> for Nonce {
260 fn from(nonce: u128) -> Self {
261 let upper_half = (nonce >> 64) as u64;
262 let lower_half = nonce as u64;
263 Self {
264 upper_half,
265 lower_half,
266 }
267 }
268}
269
270impl From<Nonce> for u128 {
271 fn from(nonce: Nonce) -> Self {
272 let upper_half = (nonce.upper_half as u128) << 64;
273 let lower_half = nonce.lower_half as u128;
274 upper_half | lower_half
275 }
276}