diff --git a/crates/client/src/channel.rs b/crates/client/src/channel.rs index ab65b4d22830c5ea4d815d6db7f352574c59f013..9c3e1112d6602a7a7dd967349b74937387b48451 100644 --- a/crates/client/src/channel.rs +++ b/crates/client/src/channel.rs @@ -4,6 +4,7 @@ use super::{ Client, Status, Subscription, TypedEnvelope, }; use anyhow::{anyhow, Context, Result}; +use futures::lock::Mutex; use gpui::{ AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle, }; @@ -40,6 +41,7 @@ pub struct Channel { next_pending_message_id: usize, user_store: ModelHandle, rpc: Arc, + outgoing_messages_lock: Arc>, rng: StdRng, _subscription: Subscription, } @@ -214,6 +216,7 @@ impl Channel { details, user_store, rpc, + outgoing_messages_lock: Default::default(), messages: Default::default(), loaded_all_messages: false, next_pending_message_id: 0, @@ -259,13 +262,16 @@ impl Channel { ); let user_store = self.user_store.clone(); let rpc = self.rpc.clone(); + let outgoing_messages_lock = self.outgoing_messages_lock.clone(); Ok(cx.spawn(|this, mut cx| async move { + let outgoing_message_guard = outgoing_messages_lock.lock().await; let request = rpc.request(proto::SendChannelMessage { channel_id, body, nonce: Some(nonce.into()), }); let response = request.await?; + drop(outgoing_message_guard); let message = ChannelMessage::from_proto( response.message.ok_or_else(|| anyhow!("invalid message"))?, &user_store, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index c7aad00e8b39be5b42f970155e6637e560d9484b..76123f75bcc75742e48bfdf3728dfe79e6d2f4c2 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -191,16 +191,17 @@ impl Server { log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name); if let Some(handler) = this.handlers.get(&message.payload_type_id()) { let handle_message = (handler)(this.clone(), message); + let notifications = this.notifications.clone(); executor.spawn_detached(async move { if let Err(err) = handle_message.await { log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err); } else { log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed()); } + if let Some(mut notifications) = notifications { + let _ = notifications.send(()).await; + } }); - if let Some(mut notifications) = this.notifications.clone() { - let _ = notifications.send(()).await; - } } else { log::warn!("unhandled message: {}", type_name); }