rpc.rs

 1use postage::prelude::Stream;
 2use std::{future::Future, sync::Arc};
 3use zed_rpc::{proto, Peer, TypedEnvelope};
 4
 5pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
 6    type Output: 'a + Future<Output = anyhow::Result<()>>;
 7
 8    fn handle(
 9        &self,
10        message: TypedEnvelope<M>,
11        rpc: &'a Arc<Peer>,
12        cx: &'a mut gpui::AsyncAppContext,
13    ) -> Self::Output;
14}
15
16impl<'a, M, F, Fut> MessageHandler<'a, M> for F
17where
18    M: proto::EnvelopedMessage,
19    F: Fn(TypedEnvelope<M>, &'a Arc<Peer>, &'a mut gpui::AsyncAppContext) -> Fut,
20    Fut: 'a + Future<Output = anyhow::Result<()>>,
21{
22    type Output = Fut;
23
24    fn handle(
25        &self,
26        message: TypedEnvelope<M>,
27        rpc: &'a Arc<Peer>,
28        cx: &'a mut gpui::AsyncAppContext,
29    ) -> Self::Output {
30        (self)(message, rpc, cx)
31    }
32}
33
34pub trait PeerExt {
35    fn handle_messages<H, M>(&self, handler: H, cx: &mut gpui::MutableAppContext)
36    where
37        H: 'static + for<'a> MessageHandler<'a, M>,
38        M: proto::EnvelopedMessage;
39}
40
41impl PeerExt for Arc<Peer> {
42    fn handle_messages<H, M>(&self, handler: H, cx: &mut gpui::MutableAppContext)
43    where
44        H: 'static + for<'a> MessageHandler<'a, M>,
45        M: proto::EnvelopedMessage,
46    {
47        let rpc = self.clone();
48        let mut messages = smol::block_on(self.add_message_handler::<M>());
49        cx.spawn(|mut cx| async move {
50            while let Some(message) = messages.recv().await {
51                if let Err(err) = handler.handle(message, &rpc, &mut cx).await {
52                    log::error!("error handling message: {:?}", err);
53                }
54            }
55        })
56        .detach();
57    }
58}