1use super::{ConnectionId, PeerId, TypedEnvelope};
2use anyhow::Result;
3use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
4use futures::{SinkExt as _, StreamExt as _};
5use prost::Message;
6use std::any::{Any, TypeId};
7use std::{
8 io,
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
13
14pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
15 const NAME: &'static str;
16 fn into_envelope(
17 self,
18 id: u32,
19 responding_to: Option<u32>,
20 original_sender_id: Option<u32>,
21 ) -> Envelope;
22 fn from_envelope(envelope: Envelope) -> Option<Self>;
23}
24
25pub trait EntityMessage: EnvelopedMessage {
26 fn remote_entity_id(&self) -> u64;
27}
28
29pub trait RequestMessage: EnvelopedMessage {
30 type Response: EnvelopedMessage;
31}
32
33pub trait AnyTypedEnvelope: 'static + Send + Sync {
34 fn payload_type_id(&self) -> TypeId;
35 fn payload_type_name(&self) -> &'static str;
36 fn as_any(&self) -> &dyn Any;
37 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
38}
39
40impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
41 fn payload_type_id(&self) -> TypeId {
42 TypeId::of::<T>()
43 }
44
45 fn payload_type_name(&self) -> &'static str {
46 T::NAME
47 }
48
49 fn as_any(&self) -> &dyn Any {
50 self
51 }
52
53 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
54 self
55 }
56}
57
58macro_rules! messages {
59 ($($name:ident),* $(,)?) => {
60 pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
61 match envelope.payload {
62 $(Some(envelope::Payload::$name(payload)) => {
63 Some(Box::new(TypedEnvelope {
64 sender_id,
65 original_sender_id: envelope.original_sender_id.map(PeerId),
66 message_id: envelope.id,
67 payload,
68 }))
69 }, )*
70 _ => None
71 }
72 }
73
74 $(
75 impl EnvelopedMessage for $name {
76 const NAME: &'static str = std::stringify!($name);
77
78 fn into_envelope(
79 self,
80 id: u32,
81 responding_to: Option<u32>,
82 original_sender_id: Option<u32>,
83 ) -> Envelope {
84 Envelope {
85 id,
86 responding_to,
87 original_sender_id,
88 payload: Some(envelope::Payload::$name(self)),
89 }
90 }
91
92 fn from_envelope(envelope: Envelope) -> Option<Self> {
93 if let Some(envelope::Payload::$name(msg)) = envelope.payload {
94 Some(msg)
95 } else {
96 None
97 }
98 }
99 }
100 )*
101 };
102}
103
104macro_rules! request_messages {
105 ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
106 $(impl RequestMessage for $request_name {
107 type Response = $response_name;
108 })*
109 };
110}
111
112macro_rules! entity_messages {
113 ($id_field:ident, $($name:ident),* $(,)?) => {
114 $(impl EntityMessage for $name {
115 fn remote_entity_id(&self) -> u64 {
116 self.$id_field
117 }
118 })*
119 };
120}
121
122messages!(
123 Ack,
124 AddPeer,
125 BufferSaved,
126 ChannelMessageSent,
127 CloseBuffer,
128 CloseWorktree,
129 Error,
130 GetChannelMessages,
131 GetChannelMessagesResponse,
132 GetChannels,
133 GetChannelsResponse,
134 UpdateCollaborators,
135 GetUsers,
136 GetUsersResponse,
137 JoinChannel,
138 JoinChannelResponse,
139 JoinWorktree,
140 JoinWorktreeResponse,
141 LeaveChannel,
142 LeaveWorktree,
143 OpenBuffer,
144 OpenBufferResponse,
145 OpenWorktree,
146 OpenWorktreeResponse,
147 Ping,
148 RemovePeer,
149 SaveBuffer,
150 SendChannelMessage,
151 SendChannelMessageResponse,
152 ShareWorktree,
153 ShareWorktreeResponse,
154 UnshareWorktree,
155 UpdateBuffer,
156 UpdateWorktree,
157);
158
159request_messages!(
160 (GetChannels, GetChannelsResponse),
161 (GetUsers, GetUsersResponse),
162 (JoinChannel, JoinChannelResponse),
163 (OpenBuffer, OpenBufferResponse),
164 (JoinWorktree, JoinWorktreeResponse),
165 (OpenWorktree, OpenWorktreeResponse),
166 (Ping, Ack),
167 (SaveBuffer, BufferSaved),
168 (UpdateBuffer, Ack),
169 (ShareWorktree, ShareWorktreeResponse),
170 (UnshareWorktree, Ack),
171 (SendChannelMessage, SendChannelMessageResponse),
172 (GetChannelMessages, GetChannelMessagesResponse),
173);
174
175entity_messages!(
176 worktree_id,
177 AddPeer,
178 BufferSaved,
179 CloseBuffer,
180 CloseWorktree,
181 OpenBuffer,
182 JoinWorktree,
183 RemovePeer,
184 SaveBuffer,
185 UnshareWorktree,
186 UpdateBuffer,
187 UpdateWorktree,
188);
189
190entity_messages!(channel_id, ChannelMessageSent);
191
192/// A stream of protobuf messages.
193pub struct MessageStream<S> {
194 stream: S,
195}
196
197impl<S> MessageStream<S> {
198 pub fn new(stream: S) -> Self {
199 Self { stream }
200 }
201
202 pub fn inner_mut(&mut self) -> &mut S {
203 &mut self.stream
204 }
205}
206
207impl<S> MessageStream<S>
208where
209 S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
210{
211 /// Write a given protobuf message to the stream.
212 pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
213 let mut buffer = Vec::with_capacity(message.encoded_len());
214 message
215 .encode(&mut buffer)
216 .map_err(|err| io::Error::from(err))?;
217 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
218 Ok(())
219 }
220}
221
222impl<S> MessageStream<S>
223where
224 S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
225{
226 /// Read a protobuf message of the given type from the stream.
227 pub async fn read_message(&mut self) -> Result<Envelope, WebSocketError> {
228 while let Some(bytes) = self.stream.next().await {
229 match bytes? {
230 WebSocketMessage::Binary(bytes) => {
231 let envelope = Envelope::decode(bytes.as_slice()).map_err(io::Error::from)?;
232 return Ok(envelope);
233 }
234 WebSocketMessage::Close(_) => break,
235 _ => {}
236 }
237 }
238 Err(WebSocketError::ConnectionClosed)
239 }
240}
241
242impl Into<SystemTime> for Timestamp {
243 fn into(self) -> SystemTime {
244 UNIX_EPOCH
245 .checked_add(Duration::new(self.seconds, self.nanos))
246 .unwrap()
247 }
248}
249
250impl From<SystemTime> for Timestamp {
251 fn from(time: SystemTime) -> Self {
252 let duration = time.duration_since(UNIX_EPOCH).unwrap();
253 Self {
254 seconds: duration.as_secs(),
255 nanos: duration.subsec_nanos(),
256 }
257 }
258}
259
260impl From<u128> for Nonce {
261 fn from(nonce: u128) -> Self {
262 let upper_half = (nonce >> 64) as u64;
263 let lower_half = nonce as u64;
264 Self {
265 upper_half,
266 lower_half,
267 }
268 }
269}
270
271impl From<Nonce> for u128 {
272 fn from(nonce: Nonce) -> Self {
273 let upper_half = (nonce.upper_half as u128) << 64;
274 let lower_half = nonce.lower_half as u128;
275 upper_half | lower_half
276 }
277}