Merge branch 'main' into assists

Antonio Scandurra created

Change summary

Cargo.lock                     |   2 
crates/client/src/channel.rs   |  38 +--
crates/client/src/client.rs    |  62 ++++-
crates/client/src/test.rs      |   6 
crates/project/src/project.rs  | 164 ++++++---------
crates/project/src/worktree.rs |  92 +++-----
crates/rpc/src/peer.rs         | 172 +++++++--------
crates/server/Cargo.toml       |   2 
crates/server/src/rpc.rs       | 371 +++++++++++++++--------------------
9 files changed, 412 insertions(+), 497 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -5822,7 +5822,9 @@ dependencies = [
  "clap 3.0.0-beta.2",
  "collections",
  "comrak",
+ "ctor",
  "either",
+ "env_logger",
  "envy",
  "futures",
  "gpui",

crates/client/src/channel.rs 🔗

@@ -17,7 +17,7 @@ use std::{
 };
 use sum_tree::{Bias, SumTree};
 use time::OffsetDateTime;
-use util::{post_inc, TryFutureExt};
+use util::{post_inc, ResultExt as _, TryFutureExt};
 
 pub struct ChannelList {
     available_channels: Option<Vec<ChannelDetails>>,
@@ -168,16 +168,12 @@ impl ChannelList {
 impl Entity for Channel {
     type Event = ChannelEvent;
 
-    fn release(&mut self, cx: &mut MutableAppContext) {
-        let rpc = self.rpc.clone();
-        let channel_id = self.details.id;
-        cx.foreground()
-            .spawn(async move {
-                if let Err(error) = rpc.send(proto::LeaveChannel { channel_id }).await {
-                    log::error!("error leaving channel: {}", error);
-                };
+    fn release(&mut self, _: &mut MutableAppContext) {
+        self.rpc
+            .send(proto::LeaveChannel {
+                channel_id: self.details.id,
             })
-            .detach()
+            .log_err();
     }
 }
 
@@ -718,18 +714,16 @@ mod tests {
         });
 
         // Receive a new message.
-        server
-            .send(proto::ChannelMessageSent {
-                channel_id: channel.read_with(&cx, |channel, _| channel.details.id),
-                message: Some(proto::ChannelMessage {
-                    id: 12,
-                    body: "c".into(),
-                    timestamp: 1002,
-                    sender_id: 7,
-                    nonce: Some(3.into()),
-                }),
-            })
-            .await;
+        server.send(proto::ChannelMessageSent {
+            channel_id: channel.read_with(&cx, |channel, _| channel.details.id),
+            message: Some(proto::ChannelMessage {
+                id: 12,
+                body: "c".into(),
+                timestamp: 1002,
+                sender_id: 7,
+                nonce: Some(3.into()),
+            }),
+        });
 
         // Client requests user for message since they haven't seen them yet
         let get_users = server.receive::<proto::GetUsers>().await.unwrap();

crates/client/src/client.rs 🔗

@@ -24,8 +24,10 @@ use std::{
     collections::HashMap,
     convert::TryFrom,
     fmt::Write as _,
-    future::Future,
-    sync::{Arc, Weak},
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc, Weak,
+    },
     time::{Duration, Instant},
 };
 use surf::{http::Method, Url};
@@ -55,6 +57,7 @@ pub fn init(rpc: Arc<Client>, cx: &mut MutableAppContext) {
 }
 
 pub struct Client {
+    id: usize,
     peer: Arc<Peer>,
     http: Arc<dyn HttpClient>,
     state: RwLock<ClientState>,
@@ -167,7 +170,12 @@ impl Drop for Subscription {
 
 impl Client {
     pub fn new(http: Arc<dyn HttpClient>) -> Arc<Self> {
+        lazy_static! {
+            static ref NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::default();
+        }
+
         Arc::new(Self {
+            id: NEXT_CLIENT_ID.fetch_add(1, Ordering::SeqCst),
             peer: Peer::new(),
             http,
             state: Default::default(),
@@ -448,21 +456,31 @@ impl Client {
                             None
                         };
 
+                        let type_name = message.payload_type_name();
+
                         let handler_key = (payload_type_id, entity_id);
                         if let Some(handler) = state.model_handlers.get_mut(&handler_key) {
                             let mut handler = handler.take().unwrap();
                             drop(state); // Avoid deadlocks if the handler interacts with rpc::Client
-                            let start_time = Instant::now();
-                            log::info!("RPC client message {}", message.payload_type_name());
+
+                            log::debug!(
+                                "rpc message received. client_id:{}, name:{}",
+                                this.id,
+                                type_name
+                            );
                             (handler)(message, &mut cx);
-                            log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
+                            log::debug!(
+                                "rpc message handled. client_id:{}, name:{}",
+                                this.id,
+                                type_name
+                            );
 
                             let mut state = this.state.write();
                             if state.model_handlers.contains_key(&handler_key) {
                                 state.model_handlers.insert(handler_key, Some(handler));
                             }
                         } else {
-                            log::info!("unhandled message {}", message.payload_type_name());
+                            log::info!("unhandled message {}", type_name);
                         }
                     }
                 }
@@ -677,19 +695,32 @@ impl Client {
         }
     }
 
-    pub async fn send<T: EnvelopedMessage>(&self, message: T) -> Result<()> {
-        self.peer.send(self.connection_id()?, message).await
+    pub fn send<T: EnvelopedMessage>(&self, message: T) -> Result<()> {
+        log::debug!("rpc send. client_id:{}, name:{}", self.id, T::NAME);
+        self.peer.send(self.connection_id()?, message)
     }
 
     pub async fn request<T: RequestMessage>(&self, request: T) -> Result<T::Response> {
-        self.peer.request(self.connection_id()?, request).await
+        log::debug!(
+            "rpc request start. client_id: {}. name:{}",
+            self.id,
+            T::NAME
+        );
+        let response = self.peer.request(self.connection_id()?, request).await;
+        log::debug!(
+            "rpc request finish. client_id: {}. name:{}",
+            self.id,
+            T::NAME
+        );
+        response
     }
 
     pub fn respond<T: RequestMessage>(
         &self,
         receipt: Receipt<T>,
         response: T::Response,
-    ) -> impl Future<Output = Result<()>> {
+    ) -> Result<()> {
+        log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
         self.peer.respond(receipt, response)
     }
 
@@ -697,7 +728,8 @@ impl Client {
         &self,
         receipt: Receipt<T>,
         error: proto::Error,
-    ) -> impl Future<Output = Result<()>> {
+    ) -> Result<()> {
+        log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
         self.peer.respond_with_error(receipt, error)
     }
 }
@@ -860,8 +892,8 @@ mod tests {
         });
         drop(subscription3);
 
-        server.send(proto::UnshareProject { project_id: 1 }).await;
-        server.send(proto::UnshareProject { project_id: 2 }).await;
+        server.send(proto::UnshareProject { project_id: 1 });
+        server.send(proto::UnshareProject { project_id: 2 });
         done_rx1.next().await.unwrap();
         done_rx2.next().await.unwrap();
     }
@@ -890,7 +922,7 @@ mod tests {
                 Ok(())
             })
         });
-        server.send(proto::Ping {}).await;
+        server.send(proto::Ping {});
         done_rx2.next().await.unwrap();
     }
 
@@ -914,7 +946,7 @@ mod tests {
                 },
             ));
         });
-        server.send(proto::Ping {}).await;
+        server.send(proto::Ping {});
         done_rx.next().await.unwrap();
     }
 

crates/client/src/test.rs 🔗

@@ -118,8 +118,8 @@ impl FakeServer {
         self.forbid_connections.store(false, SeqCst);
     }
 
-    pub async fn send<T: proto::EnvelopedMessage>(&self, message: T) {
-        self.peer.send(self.connection_id(), message).await.unwrap();
+    pub fn send<T: proto::EnvelopedMessage>(&self, message: T) {
+        self.peer.send(self.connection_id(), message).unwrap();
     }
 
     pub async fn receive<M: proto::EnvelopedMessage>(&self) -> Result<TypedEnvelope<M>> {
@@ -148,7 +148,7 @@ impl FakeServer {
         receipt: Receipt<T>,
         response: T::Response,
     ) {
-        self.peer.respond(receipt, response).await.unwrap()
+        self.peer.respond(receipt, response).unwrap()
     }
 
     fn connection_id(&self) -> ConnectionId {

crates/project/src/project.rs 🔗

@@ -461,7 +461,7 @@ impl Project {
                 }
             })?;
 
-            rpc.send(proto::UnshareProject { project_id }).await?;
+            rpc.send(proto::UnshareProject { project_id })?;
             this.update(&mut cx, |this, cx| {
                 this.collaborators.clear();
                 this.shared_buffers.clear();
@@ -856,15 +856,13 @@ impl Project {
                 let this = cx.read(|cx| this.upgrade(cx))?;
                 match message {
                     LspEvent::DiagnosticsStart => {
-                        let send = this.update(&mut cx, |this, cx| {
+                        this.update(&mut cx, |this, cx| {
                             this.disk_based_diagnostics_started(cx);
-                            this.remote_id().map(|project_id| {
+                            if let Some(project_id) = this.remote_id() {
                                 rpc.send(proto::DiskBasedDiagnosticsUpdating { project_id })
-                            })
+                                    .log_err();
+                            }
                         });
-                        if let Some(send) = send {
-                            send.await.log_err();
-                        }
                     }
                     LspEvent::DiagnosticsUpdate(mut params) => {
                         language.process_diagnostics(&mut params);
@@ -874,15 +872,13 @@ impl Project {
                         });
                     }
                     LspEvent::DiagnosticsFinish => {
-                        let send = this.update(&mut cx, |this, cx| {
+                        this.update(&mut cx, |this, cx| {
                             this.disk_based_diagnostics_finished(cx);
-                            this.remote_id().map(|project_id| {
+                            if let Some(project_id) = this.remote_id() {
                                 rpc.send(proto::DiskBasedDiagnosticsUpdated { project_id })
-                            })
+                                    .log_err();
+                            }
                         });
-                        if let Some(send) = send {
-                            send.await.log_err();
-                        }
                     }
                 }
             }
@@ -1501,15 +1497,13 @@ impl Project {
                         };
 
                         if let Some(project_id) = self.remote_id() {
-                            let client = self.client.clone();
-                            let message = proto::UpdateBufferFile {
-                                project_id,
-                                buffer_id: *buffer_id as u64,
-                                file: Some(new_file.to_proto()),
-                            };
-                            cx.foreground()
-                                .spawn(async move { client.send(message).await })
-                                .detach_and_log_err(cx);
+                            self.client
+                                .send(proto::UpdateBufferFile {
+                                    project_id,
+                                    buffer_id: *buffer_id as u64,
+                                    file: Some(new_file.to_proto()),
+                                })
+                                .log_err();
                         }
                         buffer.file_updated(Box::new(new_file), cx).detach();
                     }
@@ -1829,8 +1823,7 @@ impl Project {
                             version: (&version).into(),
                             mtime: Some(mtime.into()),
                         },
-                    )
-                    .await?;
+                    )?;
 
                     Ok(())
                 }
@@ -1859,16 +1852,13 @@ impl Project {
             // associated with formatting.
             cx.spawn(|_| async move {
                 match format {
-                    Ok(()) => rpc.respond(receipt, proto::Ack {}).await?,
-                    Err(error) => {
-                        rpc.respond_with_error(
-                            receipt,
-                            proto::Error {
-                                message: error.to_string(),
-                            },
-                        )
-                        .await?
-                    }
+                    Ok(()) => rpc.respond(receipt, proto::Ack {})?,
+                    Err(error) => rpc.respond_with_error(
+                        receipt,
+                        proto::Error {
+                            message: error.to_string(),
+                        },
+                    )?,
                 }
                 Ok::<_, anyhow::Error>(())
             })
@@ -1902,27 +1892,21 @@ impl Project {
                 .update(&mut cx, |buffer, cx| buffer.completions(position, cx))
                 .await
             {
-                Ok(completions) => {
-                    rpc.respond(
-                        receipt,
-                        proto::GetCompletionsResponse {
-                            completions: completions
-                                .iter()
-                                .map(language::proto::serialize_completion)
-                                .collect(),
-                        },
-                    )
-                    .await
-                }
-                Err(error) => {
-                    rpc.respond_with_error(
-                        receipt,
-                        proto::Error {
-                            message: error.to_string(),
-                        },
-                    )
-                    .await
-                }
+                Ok(completions) => rpc.respond(
+                    receipt,
+                    proto::GetCompletionsResponse {
+                        completions: completions
+                            .iter()
+                            .map(language::proto::serialize_completion)
+                            .collect(),
+                    },
+                ),
+                Err(error) => rpc.respond_with_error(
+                    receipt,
+                    proto::Error {
+                        message: error.to_string(),
+                    },
+                ),
             }
         })
         .detach_and_log_err(cx);
@@ -1957,30 +1941,24 @@ impl Project {
                 })
                 .await
             {
-                Ok(edit_ids) => {
-                    rpc.respond(
-                        receipt,
-                        proto::ApplyCompletionAdditionalEditsResponse {
-                            additional_edits: edit_ids
-                                .into_iter()
-                                .map(|edit_id| proto::AdditionalEdit {
-                                    replica_id: edit_id.replica_id as u32,
-                                    local_timestamp: edit_id.value,
-                                })
-                                .collect(),
-                        },
-                    )
-                    .await
-                }
-                Err(error) => {
-                    rpc.respond_with_error(
-                        receipt,
-                        proto::Error {
-                            message: error.to_string(),
-                        },
-                    )
-                    .await
-                }
+                Ok(edit_ids) => rpc.respond(
+                    receipt,
+                    proto::ApplyCompletionAdditionalEditsResponse {
+                        additional_edits: edit_ids
+                            .into_iter()
+                            .map(|edit_id| proto::AdditionalEdit {
+                                replica_id: edit_id.replica_id as u32,
+                                local_timestamp: edit_id.value,
+                            })
+                            .collect(),
+                    },
+                ),
+                Err(error) => rpc.respond_with_error(
+                    receipt,
+                    proto::Error {
+                        message: error.to_string(),
+                    },
+                ),
             }
         })
         .detach_and_log_err(cx);
@@ -2026,7 +2004,7 @@ impl Project {
                     });
                 }
             });
-            rpc.respond(receipt, response).await?;
+            rpc.respond(receipt, response)?;
             Ok::<_, anyhow::Error>(())
         })
         .detach_and_log_err(cx);
@@ -2062,7 +2040,6 @@ impl Project {
                         buffer: Some(buffer),
                     },
                 )
-                .await
             }
             .log_err()
         })
@@ -2296,28 +2273,21 @@ impl<'a> Iterator for CandidateSetIter<'a> {
 impl Entity for Project {
     type Event = Event;
 
-    fn release(&mut self, cx: &mut gpui::MutableAppContext) {
+    fn release(&mut self, _: &mut gpui::MutableAppContext) {
         match &self.client_state {
             ProjectClientState::Local { remote_id_rx, .. } => {
                 if let Some(project_id) = *remote_id_rx.borrow() {
-                    let rpc = self.client.clone();
-                    cx.spawn(|_| async move {
-                        if let Err(err) = rpc.send(proto::UnregisterProject { project_id }).await {
-                            log::error!("error unregistering project: {}", err);
-                        }
-                    })
-                    .detach();
+                    self.client
+                        .send(proto::UnregisterProject { project_id })
+                        .log_err();
                 }
             }
             ProjectClientState::Remote { remote_id, .. } => {
-                let rpc = self.client.clone();
-                let project_id = *remote_id;
-                cx.spawn(|_| async move {
-                    if let Err(err) = rpc.send(proto::LeaveProject { project_id }).await {
-                        log::error!("error leaving project: {}", err);
-                    }
-                })
-                .detach();
+                self.client
+                    .send(proto::LeaveProject {
+                        project_id: *remote_id,
+                    })
+                    .log_err();
             }
         }
     }

crates/project/src/worktree.rs 🔗

@@ -149,7 +149,7 @@ pub enum Event {
 impl Entity for Worktree {
     type Event = Event;
 
-    fn release(&mut self, cx: &mut MutableAppContext) {
+    fn release(&mut self, _: &mut MutableAppContext) {
         if let Some(worktree) = self.as_local_mut() {
             if let Registration::Done { project_id } = worktree.registration {
                 let client = worktree.client.clone();
@@ -157,12 +157,7 @@ impl Entity for Worktree {
                     project_id,
                     worktree_id: worktree.id().to_proto(),
                 };
-                cx.foreground()
-                    .spawn(async move {
-                        client.send(unregister_message).await?;
-                        Ok::<_, anyhow::Error>(())
-                    })
-                    .detach_and_log_err(cx);
+                client.send(unregister_message).log_err();
             }
         }
     }
@@ -596,7 +591,7 @@ impl LocalWorktree {
         &mut self,
         worktree_path: Arc<Path>,
         diagnostics: Vec<DiagnosticEntry<PointUtf16>>,
-        cx: &mut ModelContext<Worktree>,
+        _: &mut ModelContext<Worktree>,
     ) -> Result<()> {
         let summary = DiagnosticSummary::new(&diagnostics);
         self.diagnostic_summaries
@@ -604,30 +599,19 @@ impl LocalWorktree {
         self.diagnostics.insert(worktree_path.clone(), diagnostics);
 
         if let Some(share) = self.share.as_ref() {
-            cx.foreground()
-                .spawn({
-                    let client = self.client.clone();
-                    let project_id = share.project_id;
-                    let worktree_id = self.id().to_proto();
-                    let path = worktree_path.to_string_lossy().to_string();
-                    async move {
-                        client
-                            .send(proto::UpdateDiagnosticSummary {
-                                project_id,
-                                worktree_id,
-                                summary: Some(proto::DiagnosticSummary {
-                                    path,
-                                    error_count: summary.error_count as u32,
-                                    warning_count: summary.warning_count as u32,
-                                    info_count: summary.info_count as u32,
-                                    hint_count: summary.hint_count as u32,
-                                }),
-                            })
-                            .await
-                            .log_err()
-                    }
+            self.client
+                .send(proto::UpdateDiagnosticSummary {
+                    project_id: share.project_id,
+                    worktree_id: self.id().to_proto(),
+                    summary: Some(proto::DiagnosticSummary {
+                        path: worktree_path.to_string_lossy().to_string(),
+                        error_count: summary.error_count as u32,
+                        warning_count: summary.warning_count as u32,
+                        info_count: summary.info_count as u32,
+                        hint_count: summary.hint_count as u32,
+                    }),
                 })
-                .detach();
+                .log_err();
         }
 
         Ok(())
@@ -787,7 +771,7 @@ impl LocalWorktree {
                 while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
                     let message =
                         snapshot.build_update(&prev_snapshot, project_id, worktree_id, false);
-                    match rpc.send(message).await {
+                    match rpc.send(message) {
                         Ok(()) => prev_snapshot = snapshot,
                         Err(err) => log::error!("error sending snapshot diff {}", err),
                     }
@@ -1377,8 +1361,7 @@ impl language::File for File {
                             buffer_id,
                             version: (&version).into(),
                             mtime: Some(entry.mtime.into()),
-                        })
-                        .await?;
+                        })?;
                     }
                     Ok((version, entry.mtime))
                 })
@@ -1501,23 +1484,15 @@ impl language::File for File {
     }
 
     fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
-        self.worktree.update(cx, |worktree, cx| {
+        self.worktree.update(cx, |worktree, _| {
             if let Worktree::Remote(worktree) = worktree {
-                let project_id = worktree.project_id;
-                let rpc = worktree.client.clone();
-                cx.background()
-                    .spawn(async move {
-                        if let Err(error) = rpc
-                            .send(proto::CloseBuffer {
-                                project_id,
-                                buffer_id,
-                            })
-                            .await
-                        {
-                            log::error!("error closing remote buffer: {}", error);
-                        }
+                worktree
+                    .client
+                    .send(proto::CloseBuffer {
+                        project_id: worktree.project_id,
+                        buffer_id,
                     })
-                    .detach();
+                    .log_err();
             }
         });
     }
@@ -1563,16 +1538,15 @@ impl language::LocalFile for File {
     ) {
         let worktree = self.worktree.read(cx).as_local().unwrap();
         if let Some(project_id) = worktree.share.as_ref().map(|share| share.project_id) {
-            let rpc = worktree.client.clone();
-            let message = proto::BufferReloaded {
-                project_id,
-                buffer_id,
-                version: version.into(),
-                mtime: Some(mtime.into()),
-            };
-            cx.background()
-                .spawn(async move { rpc.send(message).await })
-                .detach_and_log_err(cx);
+            worktree
+                .client
+                .send(proto::BufferReloaded {
+                    project_id,
+                    buffer_id,
+                    version: version.into(),
+                    mtime: Some(mtime.into()),
+                })
+                .log_err();
         }
     }
 }

crates/rpc/src/peer.rs 🔗

@@ -5,7 +5,7 @@ use futures::stream::BoxStream;
 use futures::{FutureExt as _, StreamExt};
 use parking_lot::{Mutex, RwLock};
 use postage::{
-    mpsc,
+    barrier, mpsc,
     prelude::{Sink as _, Stream as _},
 };
 use smol_timeout::TimeoutExt as _;
@@ -89,9 +89,10 @@ pub struct Peer {
 
 #[derive(Clone)]
 pub struct ConnectionState {
-    outgoing_tx: mpsc::Sender<proto::Envelope>,
+    outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Envelope>,
     next_message_id: Arc<AtomicU32>,
-    response_channels: Arc<Mutex<Option<HashMap<u32, mpsc::Sender<proto::Envelope>>>>>,
+    response_channels:
+        Arc<Mutex<Option<HashMap<u32, mpsc::Sender<(proto::Envelope, barrier::Sender)>>>>>,
 }
 
 const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
@@ -112,9 +113,14 @@ impl Peer {
         impl Future<Output = anyhow::Result<()>> + Send,
         BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
     ) {
-        let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
+        // For outgoing messages, use an unbounded channel so that application code
+        // can always send messages without yielding. For incoming messages, use a
+        // bounded channel so that other peers will receive backpressure if they send
+        // messages faster than this peer can process them.
         let (mut incoming_tx, incoming_rx) = mpsc::channel(64);
-        let (outgoing_tx, mut outgoing_rx) = mpsc::channel(64);
+        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
+
+        let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
         let connection_state = ConnectionState {
             outgoing_tx,
             next_message_id: Default::default(),
@@ -131,6 +137,16 @@ impl Peer {
                 futures::pin_mut!(read_message);
                 loop {
                     futures::select_biased! {
+                        outgoing = outgoing_rx.next().fuse() => match outgoing {
+                            Some(outgoing) => {
+                                match writer.write_message(&outgoing).timeout(WRITE_TIMEOUT).await {
+                                    None => break 'outer Err(anyhow!("timed out writing RPC message")),
+                                    Some(Err(result)) => break 'outer Err(result).context("failed to write RPC message"),
+                                    _ => {}
+                                }
+                            }
+                            None => break 'outer Ok(()),
+                        },
                         incoming = read_message => match incoming {
                             Ok(incoming) => {
                                 if incoming_tx.send(incoming).await.is_err() {
@@ -142,16 +158,6 @@ impl Peer {
                                 break 'outer Err(error).context("received invalid RPC message")
                             }
                         },
-                        outgoing = outgoing_rx.recv().fuse() => match outgoing {
-                            Some(outgoing) => {
-                                match writer.write_message(&outgoing).timeout(WRITE_TIMEOUT).await {
-                                    None => break 'outer Err(anyhow!("timed out writing RPC message")),
-                                    Some(Err(result)) => break 'outer Err(result).context("failed to write RPC message"),
-                                    _ => {}
-                                }
-                            }
-                            None => break 'outer Ok(()),
-                        }
                     }
                 }
             };
@@ -172,7 +178,9 @@ impl Peer {
                 if let Some(responding_to) = incoming.responding_to {
                     let channel = response_channels.lock().as_mut()?.remove(&responding_to);
                     if let Some(mut tx) = channel {
-                        tx.send(incoming).await.ok();
+                        let mut requester_resumed = barrier::channel();
+                        tx.send((incoming, requester_resumed.0)).await.ok();
+                        requester_resumed.1.recv().await;
                     } else {
                         log::warn!("received RPC response to unknown request {}", responding_to);
                     }
@@ -200,7 +208,7 @@ impl Peer {
     }
 
     pub fn request<T: RequestMessage>(
-        self: &Arc<Self>,
+        &self,
         receiver_id: ConnectionId,
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
@@ -208,7 +216,7 @@ impl Peer {
     }
 
     pub fn forward_request<T: RequestMessage>(
-        self: &Arc<Self>,
+        &self,
         sender_id: ConnectionId,
         receiver_id: ConnectionId,
         request: T,
@@ -217,15 +225,13 @@ impl Peer {
     }
 
     pub fn request_internal<T: RequestMessage>(
-        self: &Arc<Self>,
+        &self,
         original_sender_id: Option<ConnectionId>,
         receiver_id: ConnectionId,
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
-        let this = self.clone();
         let (tx, mut rx) = mpsc::channel(1);
-        async move {
-            let mut connection = this.connection_state(receiver_id)?;
+        let send = self.connection_state(receiver_id).and_then(|connection| {
             let message_id = connection.next_message_id.fetch_add(1, SeqCst);
             connection
                 .response_channels
@@ -235,10 +241,17 @@ impl Peer {
                 .insert(message_id, tx);
             connection
                 .outgoing_tx
-                .send(request.into_envelope(message_id, None, original_sender_id.map(|id| id.0)))
-                .await
+                .unbounded_send(request.into_envelope(
+                    message_id,
+                    None,
+                    original_sender_id.map(|id| id.0),
+                ))
                 .map_err(|_| anyhow!("connection was closed"))?;
-            let response = rx
+            Ok(())
+        });
+        async move {
+            send?;
+            let (response, _barrier) = rx
                 .recv()
                 .await
                 .ok_or_else(|| anyhow!("connection was closed"))?;
@@ -251,81 +264,61 @@ impl Peer {
         }
     }
 
-    pub fn send<T: EnvelopedMessage>(
-        self: &Arc<Self>,
-        receiver_id: ConnectionId,
-        message: T,
-    ) -> impl Future<Output = Result<()>> {
-        let this = self.clone();
-        async move {
-            let mut connection = this.connection_state(receiver_id)?;
-            let message_id = connection
-                .next_message_id
-                .fetch_add(1, atomic::Ordering::SeqCst);
-            connection
-                .outgoing_tx
-                .send(message.into_envelope(message_id, None, None))
-                .await?;
-            Ok(())
-        }
+    pub fn send<T: EnvelopedMessage>(&self, receiver_id: ConnectionId, message: T) -> Result<()> {
+        let connection = self.connection_state(receiver_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(message.into_envelope(message_id, None, None))?;
+        Ok(())
     }
 
     pub fn forward_send<T: EnvelopedMessage>(
-        self: &Arc<Self>,
+        &self,
         sender_id: ConnectionId,
         receiver_id: ConnectionId,
         message: T,
-    ) -> impl Future<Output = Result<()>> {
-        let this = self.clone();
-        async move {
-            let mut connection = this.connection_state(receiver_id)?;
-            let message_id = connection
-                .next_message_id
-                .fetch_add(1, atomic::Ordering::SeqCst);
-            connection
-                .outgoing_tx
-                .send(message.into_envelope(message_id, None, Some(sender_id.0)))
-                .await?;
-            Ok(())
-        }
+    ) -> Result<()> {
+        let connection = self.connection_state(receiver_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(message.into_envelope(message_id, None, Some(sender_id.0)))?;
+        Ok(())
     }
 
     pub fn respond<T: RequestMessage>(
-        self: &Arc<Self>,
+        &self,
         receipt: Receipt<T>,
         response: T::Response,
-    ) -> impl Future<Output = Result<()>> {
-        let this = self.clone();
-        async move {
-            let mut connection = this.connection_state(receipt.sender_id)?;
-            let message_id = connection
-                .next_message_id
-                .fetch_add(1, atomic::Ordering::SeqCst);
-            connection
-                .outgoing_tx
-                .send(response.into_envelope(message_id, Some(receipt.message_id), None))
-                .await?;
-            Ok(())
-        }
+    ) -> Result<()> {
+        let connection = self.connection_state(receipt.sender_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?;
+        Ok(())
     }
 
     pub fn respond_with_error<T: RequestMessage>(
-        self: &Arc<Self>,
+        &self,
         receipt: Receipt<T>,
         response: proto::Error,
-    ) -> impl Future<Output = Result<()>> {
-        let this = self.clone();
-        async move {
-            let mut connection = this.connection_state(receipt.sender_id)?;
-            let message_id = connection
-                .next_message_id
-                .fetch_add(1, atomic::Ordering::SeqCst);
-            connection
-                .outgoing_tx
-                .send(response.into_envelope(message_id, Some(receipt.message_id), None))
-                .await?;
-            Ok(())
-        }
+    ) -> Result<()> {
+        let connection = self.connection_state(receipt.sender_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?;
+        Ok(())
     }
 
     fn connection_state(&self, connection_id: ConnectionId) -> Result<ConnectionState> {
@@ -447,7 +440,7 @@ mod tests {
                 let envelope = envelope.into_any();
                 if let Some(envelope) = envelope.downcast_ref::<TypedEnvelope<proto::Ping>>() {
                     let receipt = envelope.receipt();
-                    peer.respond(receipt, proto::Ack {}).await?
+                    peer.respond(receipt, proto::Ack {})?
                 } else if let Some(envelope) =
                     envelope.downcast_ref::<TypedEnvelope<proto::OpenBuffer>>()
                 {
@@ -475,7 +468,7 @@ mod tests {
                         }
                     };
 
-                    peer.respond(receipt, response).await?
+                    peer.respond(receipt, response)?
                 } else {
                     panic!("unknown message type");
                 }
@@ -518,7 +511,6 @@ mod tests {
                             message: "message 1".to_string(),
                         },
                     )
-                    .await
                     .unwrap();
                 server
                     .send(
@@ -527,12 +519,8 @@ mod tests {
                             message: "message 2".to_string(),
                         },
                     )
-                    .await
-                    .unwrap();
-                server
-                    .respond(request.receipt(), proto::Ack {})
-                    .await
                     .unwrap();
+                server.respond(request.receipt(), proto::Ack {}).unwrap();
 
                 // Prevent the connection from being dropped
                 server_incoming.next().await;

crates/server/Cargo.toml 🔗

@@ -59,6 +59,8 @@ features = ["runtime-async-std-rustls", "postgres", "time", "uuid"]
 collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui" }
 zed = { path = "../zed", features = ["test-support"] }
+ctor = "0.1"
+env_logger = "0.8"
 
 lazy_static = "1.4"
 serde_json = { version = "1.0.64", features = ["preserve_order"] }

crates/server/src/rpc.rs 🔗

@@ -131,7 +131,7 @@ impl Server {
             }
 
             this.state_mut().add_connection(connection_id, user_id);
-            if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
+            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
                 log::error!("error updating contacts for {:?}: {}", user_id, err);
             }
 
@@ -141,34 +141,35 @@ impl Server {
                 let next_message = incoming_rx.next().fuse();
                 futures::pin_mut!(next_message);
                 futures::select_biased! {
+                    result = handle_io => {
+                        if let Err(err) = result {
+                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
+                        }
+                        break;
+                    }
                     message = next_message => {
                         if let Some(message) = message {
                             let start_time = Instant::now();
-                            log::info!("RPC message received: {}", message.payload_type_name());
+                            let type_name = message.payload_type_name();
+                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
                             if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
                                 if let Err(err) = (handler)(this.clone(), message).await {
-                                    log::error!("error handling message: {:?}", err);
+                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
                                 } else {
-                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
+                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
                                 }
 
                                 if let Some(mut notifications) = this.notifications.clone() {
                                     let _ = notifications.send(()).await;
                                 }
                             } else {
-                                log::warn!("unhandled message: {}", message.payload_type_name());
+                                log::warn!("unhandled message: {}", type_name);
                             }
                         } else {
                             log::info!("rpc connection closed {:?}", addr);
                             break;
                         }
                     }
-                    handle_io = handle_io => {
-                        if let Err(err) = handle_io {
-                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
-                        }
-                        break;
-                    }
                 }
             }
 
@@ -191,8 +192,7 @@ impl Server {
                         self.peer
                             .send(conn_id, proto::UnshareProject { project_id })
                     },
-                )
-                .await?;
+                )?;
             }
         }
 
@@ -205,18 +205,15 @@ impl Server {
                         peer_id: connection_id.0,
                     },
                 )
-            })
-            .await?;
+            })?;
         }
 
-        self.update_contacts_for_users(removed_connection.contact_ids.iter())
-            .await?;
-
+        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
         Ok(())
     }
 
     async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
-        self.peer.respond(request.receipt(), proto::Ack {}).await?;
+        self.peer.respond(request.receipt(), proto::Ack {})?;
         Ok(())
     }
 
@@ -229,12 +226,10 @@ impl Server {
             let user_id = state.user_id_for_connection(request.sender_id)?;
             state.register_project(request.sender_id, user_id)
         };
-        self.peer
-            .respond(
-                request.receipt(),
-                proto::RegisterProjectResponse { project_id },
-            )
-            .await?;
+        self.peer.respond(
+            request.receipt(),
+            proto::RegisterProjectResponse { project_id },
+        )?;
         Ok(())
     }
 
@@ -246,8 +241,7 @@ impl Server {
             .state_mut()
             .unregister_project(request.payload.project_id, request.sender_id)
             .ok_or_else(|| anyhow!("no such project"))?;
-        self.update_contacts_for_users(project.authorized_user_ids().iter())
-            .await?;
+        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
         Ok(())
     }
 
@@ -257,7 +251,7 @@ impl Server {
     ) -> tide::Result<()> {
         self.state_mut()
             .share_project(request.payload.project_id, request.sender_id);
-        self.peer.respond(request.receipt(), proto::Ack {}).await?;
+        self.peer.respond(request.receipt(), proto::Ack {})?;
         Ok(())
     }
 
@@ -273,11 +267,8 @@ impl Server {
         broadcast(request.sender_id, project.connection_ids, |conn_id| {
             self.peer
                 .send(conn_id, proto::UnshareProject { project_id })
-        })
-        .await?;
-        self.update_contacts_for_users(&project.authorized_user_ids)
-            .await?;
-
+        })?;
+        self.update_contacts_for_users(&project.authorized_user_ids)?;
         Ok(())
     }
 
@@ -351,20 +342,17 @@ impl Server {
                             }),
                         },
                     )
-                })
-                .await?;
-                self.peer.respond(request.receipt(), response).await?;
-                self.update_contacts_for_users(&contact_user_ids).await?;
+                })?;
+                self.peer.respond(request.receipt(), response)?;
+                self.update_contacts_for_users(&contact_user_ids)?;
             }
             Err(error) => {
-                self.peer
-                    .respond_with_error(
-                        request.receipt(),
-                        proto::Error {
-                            message: error.to_string(),
-                        },
-                    )
-                    .await?;
+                self.peer.respond_with_error(
+                    request.receipt(),
+                    proto::Error {
+                        message: error.to_string(),
+                    },
+                )?;
             }
         }
 
@@ -387,10 +375,8 @@ impl Server {
                         peer_id: sender_id.0,
                     },
                 )
-            })
-            .await?;
-            self.update_contacts_for_users(&worktree.authorized_user_ids)
-                .await?;
+            })?;
+            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
         }
         Ok(())
     }
@@ -412,8 +398,7 @@ impl Server {
                 Err(err) => {
                     let message = err.to_string();
                     self.peer
-                        .respond_with_error(receipt, proto::Error { message })
-                        .await?;
+                        .respond_with_error(receipt, proto::Error { message })?;
                     return Ok(());
                 }
             }
@@ -432,17 +417,15 @@ impl Server {
         );
 
         if ok {
-            self.peer.respond(receipt, proto::Ack {}).await?;
-            self.update_contacts_for_users(&contact_user_ids).await?;
+            self.peer.respond(receipt, proto::Ack {})?;
+            self.update_contacts_for_users(&contact_user_ids)?;
         } else {
-            self.peer
-                .respond_with_error(
-                    receipt,
-                    proto::Error {
-                        message: NO_SUCH_PROJECT.to_string(),
-                    },
-                )
-                .await?;
+            self.peer.respond_with_error(
+                receipt,
+                proto::Error {
+                    message: NO_SUCH_PROJECT.to_string(),
+                },
+            )?;
         }
 
         Ok(())
@@ -457,7 +440,6 @@ impl Server {
         let (worktree, guest_connection_ids) =
             self.state_mut()
                 .unregister_worktree(project_id, worktree_id, request.sender_id)?;
-
         broadcast(request.sender_id, guest_connection_ids, |conn_id| {
             self.peer.send(
                 conn_id,
@@ -466,10 +448,8 @@ impl Server {
                     worktree_id,
                 },
             )
-        })
-        .await?;
-        self.update_contacts_for_users(&worktree.authorized_user_ids)
-            .await?;
+        })?;
+        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
         Ok(())
     }
 
@@ -511,20 +491,16 @@ impl Server {
                         request.payload.clone(),
                     )
                 },
-            )
-            .await?;
-            self.peer.respond(request.receipt(), proto::Ack {}).await?;
-            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)
-                .await?;
+            )?;
+            self.peer.respond(request.receipt(), proto::Ack {})?;
+            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
         } else {
-            self.peer
-                .respond_with_error(
-                    request.receipt(),
-                    proto::Error {
-                        message: "no such worktree".to_string(),
-                    },
-                )
-                .await?;
+            self.peer.respond_with_error(
+                request.receipt(),
+                proto::Error {
+                    message: "no such worktree".to_string(),
+                },
+            )?;
         }
         Ok(())
     }
@@ -547,8 +523,7 @@ impl Server {
         broadcast(request.sender_id, connection_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
+        })?;
 
         Ok(())
     }
@@ -574,8 +549,7 @@ impl Server {
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
+        })?;
         Ok(())
     }
 
@@ -590,8 +564,7 @@ impl Server {
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
+        })?;
         Ok(())
     }
 
@@ -606,8 +579,7 @@ impl Server {
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
+        })?;
         Ok(())
     }
 
@@ -625,7 +597,7 @@ impl Server {
             .peer
             .forward_request(request.sender_id, host_connection_id, request.payload)
             .await?;
-        self.peer.respond(receipt, response).await?;
+        self.peer.respond(receipt, response)?;
         Ok(())
     }
 
@@ -643,7 +615,7 @@ impl Server {
             .peer
             .forward_request(request.sender_id, host_connection_id, request.payload)
             .await?;
-        self.peer.respond(receipt, response).await?;
+        self.peer.respond(receipt, response)?;
         Ok(())
     }
 
@@ -657,8 +629,7 @@ impl Server {
             .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
             .host_connection_id;
         self.peer
-            .forward_send(request.sender_id, host_connection_id, request.payload)
-            .await?;
+            .forward_send(request.sender_id, host_connection_id, request.payload)?;
         Ok(())
     }
 
@@ -686,16 +657,12 @@ impl Server {
 
         broadcast(host, guests, |conn_id| {
             let response = response.clone();
-            let peer = &self.peer;
-            async move {
-                if conn_id == sender {
-                    peer.respond(receipt, response).await
-                } else {
-                    peer.forward_send(host, conn_id, response).await
-                }
+            if conn_id == sender {
+                self.peer.respond(receipt, response)
+            } else {
+                self.peer.forward_send(host, conn_id, response)
             }
-        })
-        .await?;
+        })?;
 
         Ok(())
     }
@@ -719,7 +686,7 @@ impl Server {
             .peer
             .forward_request(sender, host, request.payload.clone())
             .await?;
-        self.peer.respond(receipt, response).await?;
+        self.peer.respond(receipt, response)?;
 
         Ok(())
     }
@@ -743,8 +710,7 @@ impl Server {
             .peer
             .forward_request(sender, host, request.payload.clone())
             .await?;
-        self.peer.respond(receipt, response).await?;
-
+        self.peer.respond(receipt, response)?;
         Ok(())
     }
 
@@ -767,8 +733,7 @@ impl Server {
             .peer
             .forward_request(sender, host, request.payload.clone())
             .await?;
-        self.peer.respond(receipt, response).await?;
-
+        self.peer.respond(receipt, response)?;
         Ok(())
     }
 
@@ -783,9 +748,8 @@ impl Server {
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
-        self.peer.respond(request.receipt(), proto::Ack {}).await?;
+        })?;
+        self.peer.respond(request.receipt(), proto::Ack {})?;
         Ok(())
     }
 
@@ -800,8 +764,7 @@ impl Server {
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
+        })?;
         Ok(())
     }
 
@@ -816,8 +779,7 @@ impl Server {
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
+        })?;
         Ok(())
     }
 
@@ -832,8 +794,7 @@ impl Server {
         broadcast(request.sender_id, receiver_ids, |connection_id| {
             self.peer
                 .forward_send(request.sender_id, connection_id, request.payload.clone())
-        })
-        .await?;
+        })?;
         Ok(())
     }
 
@@ -843,20 +804,18 @@ impl Server {
     ) -> tide::Result<()> {
         let user_id = self.state().user_id_for_connection(request.sender_id)?;
         let channels = self.app_state.db.get_accessible_channels(user_id).await?;
-        self.peer
-            .respond(
-                request.receipt(),
-                proto::GetChannelsResponse {
-                    channels: channels
-                        .into_iter()
-                        .map(|chan| proto::Channel {
-                            id: chan.id.to_proto(),
-                            name: chan.name,
-                        })
-                        .collect(),
-                },
-            )
-            .await?;
+        self.peer.respond(
+            request.receipt(),
+            proto::GetChannelsResponse {
+                channels: channels
+                    .into_iter()
+                    .map(|chan| proto::Channel {
+                        id: chan.id.to_proto(),
+                        name: chan.name,
+                    })
+                    .collect(),
+            },
+        )?;
         Ok(())
     }
 
@@ -879,34 +838,30 @@ impl Server {
             })
             .collect();
         self.peer
-            .respond(receipt, proto::GetUsersResponse { users })
-            .await?;
+            .respond(receipt, proto::GetUsersResponse { users })?;
         Ok(())
     }
 
-    async fn update_contacts_for_users<'a>(
+    fn update_contacts_for_users<'a>(
         self: &Arc<Server>,
         user_ids: impl IntoIterator<Item = &'a UserId>,
-    ) -> tide::Result<()> {
-        let mut send_futures = Vec::new();
-
-        {
-            let state = self.state();
-            for user_id in user_ids {
-                let contacts = state.contacts_for_user(*user_id);
-                for connection_id in state.connection_ids_for_user(*user_id) {
-                    send_futures.push(self.peer.send(
-                        connection_id,
-                        proto::UpdateContacts {
-                            contacts: contacts.clone(),
-                        },
-                    ));
+    ) -> anyhow::Result<()> {
+        let mut result = Ok(());
+        let state = self.state();
+        for user_id in user_ids {
+            let contacts = state.contacts_for_user(*user_id);
+            for connection_id in state.connection_ids_for_user(*user_id) {
+                if let Err(error) = self.peer.send(
+                    connection_id,
+                    proto::UpdateContacts {
+                        contacts: contacts.clone(),
+                    },
+                ) {
+                    result = Err(error);
                 }
             }
         }
-        futures::future::try_join_all(send_futures).await?;
-
-        Ok(())
+        result
     }
 
     async fn join_channel(
@@ -939,15 +894,13 @@ impl Server {
                 nonce: Some(msg.nonce.as_u128().into()),
             })
             .collect::<Vec<_>>();
-        self.peer
-            .respond(
-                request.receipt(),
-                proto::JoinChannelResponse {
-                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
-                    messages,
-                },
-            )
-            .await?;
+        self.peer.respond(
+            request.receipt(),
+            proto::JoinChannelResponse {
+                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
+                messages,
+            },
+        )?;
         Ok(())
     }
 
@@ -993,25 +946,21 @@ impl Server {
         // Validate the message body.
         let body = request.payload.body.trim().to_string();
         if body.len() > MAX_MESSAGE_LEN {
-            self.peer
-                .respond_with_error(
-                    receipt,
-                    proto::Error {
-                        message: "message is too long".to_string(),
-                    },
-                )
-                .await?;
+            self.peer.respond_with_error(
+                receipt,
+                proto::Error {
+                    message: "message is too long".to_string(),
+                },
+            )?;
             return Ok(());
         }
         if body.is_empty() {
-            self.peer
-                .respond_with_error(
-                    receipt,
-                    proto::Error {
-                        message: "message can't be blank".to_string(),
-                    },
-                )
-                .await?;
+            self.peer.respond_with_error(
+                receipt,
+                proto::Error {
+                    message: "message can't be blank".to_string(),
+                },
+            )?;
             return Ok(());
         }
 
@@ -1019,14 +968,12 @@ impl Server {
         let nonce = if let Some(nonce) = request.payload.nonce {
             nonce
         } else {
-            self.peer
-                .respond_with_error(
-                    receipt,
-                    proto::Error {
-                        message: "nonce can't be blank".to_string(),
-                    },
-                )
-                .await?;
+            self.peer.respond_with_error(
+                receipt,
+                proto::Error {
+                    message: "nonce can't be blank".to_string(),
+                },
+            )?;
             return Ok(());
         };
 
@@ -1051,16 +998,13 @@ impl Server {
                     message: Some(message.clone()),
                 },
             )
-        })
-        .await?;
-        self.peer
-            .respond(
-                receipt,
-                proto::SendChannelMessageResponse {
-                    message: Some(message),
-                },
-            )
-            .await?;
+        })?;
+        self.peer.respond(
+            receipt,
+            proto::SendChannelMessageResponse {
+                message: Some(message),
+            },
+        )?;
         Ok(())
     }
 
@@ -1097,15 +1041,13 @@ impl Server {
                 nonce: Some(msg.nonce.as_u128().into()),
             })
             .collect::<Vec<_>>();
-        self.peer
-            .respond(
-                request.receipt(),
-                proto::GetChannelMessagesResponse {
-                    done: messages.len() < MESSAGE_COUNT_PER_PAGE,
-                    messages,
-                },
-            )
-            .await?;
+        self.peer.respond(
+            request.receipt(),
+            proto::GetChannelMessagesResponse {
+                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
+                messages,
+            },
+        )?;
         Ok(())
     }
 
@@ -1118,21 +1060,25 @@ impl Server {
     }
 }
 
-pub async fn broadcast<F, T>(
+fn broadcast<F>(
     sender_id: ConnectionId,
     receiver_ids: Vec<ConnectionId>,
     mut f: F,
 ) -> anyhow::Result<()>
 where
-    F: FnMut(ConnectionId) -> T,
-    T: Future<Output = anyhow::Result<()>>,
+    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 {
-    let futures = receiver_ids
-        .into_iter()
-        .filter(|id| *id != sender_id)
-        .map(|id| f(id));
-    futures::future::try_join_all(futures).await?;
-    Ok(())
+    let mut result = Ok(());
+    for receiver_id in receiver_ids {
+        if receiver_id != sender_id {
+            if let Err(error) = f(receiver_id) {
+                if result.is_ok() {
+                    result = Err(error);
+                }
+            }
+        }
+    }
+    result
 }
 
 pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
@@ -1247,6 +1193,13 @@ mod tests {
         project::{DiagnosticSummary, Project, ProjectPath},
     };
 
+    #[cfg(test)]
+    #[ctor::ctor]
+    fn init_logger() {
+        // std::env::set_var("RUST_LOG", "info");
+        env_logger::init();
+    }
+
     #[gpui::test]
     async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
         let (window_b, _) = cx_b.add_window(|_| EmptyView);