Get chat integration tests passing

Max Brunsfeld and Nathan Sobo created

* Don't send a chat message before the previous chat message
  is acknowledged.
* Fix emitting of notifications in RPC server

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/client/src/channel.rs | 6 ++++++
crates/server/src/rpc.rs     | 7 ++++---
2 files changed, 10 insertions(+), 3 deletions(-)

Detailed changes

crates/client/src/channel.rs 🔗

@@ -4,6 +4,7 @@ use super::{
     Client, Status, Subscription, TypedEnvelope,
 };
 use anyhow::{anyhow, Context, Result};
+use futures::lock::Mutex;
 use gpui::{
     AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle,
 };
@@ -40,6 +41,7 @@ pub struct Channel {
     next_pending_message_id: usize,
     user_store: ModelHandle<UserStore>,
     rpc: Arc<Client>,
+    outgoing_messages_lock: Arc<Mutex<()>>,
     rng: StdRng,
     _subscription: Subscription,
 }
@@ -214,6 +216,7 @@ impl Channel {
             details,
             user_store,
             rpc,
+            outgoing_messages_lock: Default::default(),
             messages: Default::default(),
             loaded_all_messages: false,
             next_pending_message_id: 0,
@@ -259,13 +262,16 @@ impl Channel {
         );
         let user_store = self.user_store.clone();
         let rpc = self.rpc.clone();
+        let outgoing_messages_lock = self.outgoing_messages_lock.clone();
         Ok(cx.spawn(|this, mut cx| async move {
+            let outgoing_message_guard = outgoing_messages_lock.lock().await;
             let request = rpc.request(proto::SendChannelMessage {
                 channel_id,
                 body,
                 nonce: Some(nonce.into()),
             });
             let response = request.await?;
+            drop(outgoing_message_guard);
             let message = ChannelMessage::from_proto(
                 response.message.ok_or_else(|| anyhow!("invalid message"))?,
                 &user_store,

crates/server/src/rpc.rs 🔗

@@ -191,16 +191,17 @@ impl Server {
                             log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
                             if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
                                 let handle_message = (handler)(this.clone(), message);
+                                let notifications = this.notifications.clone();
                                 executor.spawn_detached(async move {
                                     if let Err(err) = handle_message.await {
                                         log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
                                     } else {
                                         log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
                                     }
+                                    if let Some(mut notifications) = notifications {
+                                        let _ = notifications.send(()).await;
+                                    }
                                 });
-                                if let Some(mut notifications) = this.notifications.clone() {
-                                    let _ = notifications.send(()).await;
-                                }
                             } else {
                                 log::warn!("unhandled message: {}", type_name);
                             }