1use postage::prelude::Stream;
2use std::{future::Future, sync::Arc};
3use zed_rpc::{proto, Peer, TypedEnvelope};
4
5pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
6 type Output: 'a + Future<Output = anyhow::Result<()>>;
7
8 fn handle(
9 &self,
10 message: TypedEnvelope<M>,
11 rpc: &'a Arc<Peer>,
12 cx: &'a mut gpui::AsyncAppContext,
13 ) -> Self::Output;
14}
15
16impl<'a, M, F, Fut> MessageHandler<'a, M> for F
17where
18 M: proto::EnvelopedMessage,
19 F: Fn(TypedEnvelope<M>, &'a Arc<Peer>, &'a mut gpui::AsyncAppContext) -> Fut,
20 Fut: 'a + Future<Output = anyhow::Result<()>>,
21{
22 type Output = Fut;
23
24 fn handle(
25 &self,
26 message: TypedEnvelope<M>,
27 rpc: &'a Arc<Peer>,
28 cx: &'a mut gpui::AsyncAppContext,
29 ) -> Self::Output {
30 (self)(message, rpc, cx)
31 }
32}
33
34pub trait PeerExt {
35 fn handle_messages<H, M>(&self, handler: H, cx: &mut gpui::MutableAppContext)
36 where
37 H: 'static + for<'a> MessageHandler<'a, M>,
38 M: proto::EnvelopedMessage;
39}
40
41impl PeerExt for Arc<Peer> {
42 fn handle_messages<H, M>(&self, handler: H, cx: &mut gpui::MutableAppContext)
43 where
44 H: 'static + for<'a> MessageHandler<'a, M>,
45 M: proto::EnvelopedMessage,
46 {
47 let rpc = self.clone();
48 let mut messages = smol::block_on(self.add_message_handler::<M>());
49 cx.spawn(|mut cx| async move {
50 while let Some(message) = messages.recv().await {
51 if let Err(err) = handler.handle(message, &rpc, &mut cx).await {
52 log::error!("error handling message: {:?}", err);
53 }
54 }
55 })
56 .detach();
57 }
58}