diff --git a/server/src/tests.rs b/server/src/tests.rs index 257bdfb8d8633f567921836026602555fafee73c..cce311c5f3242f97837fe289439f268347751eaf 100644 --- a/server/src/tests.rs +++ b/server/src/tests.rs @@ -14,7 +14,7 @@ use sqlx::{ types::time::OffsetDateTime, Executor as _, Postgres, }; -use std::{path::Path, sync::Arc, time::SystemTime}; +use std::{path::Path, sync::Arc}; use zed::{ editor::Editor, fs::{FakeFs, Fs as _}, diff --git a/zed/src/channel.rs b/zed/src/channel.rs index 9ce1f7a97cd201e88a517a65193a966a6b669665..2aa2a966eea928ed1bc835de45385d7daed0fdd9 100644 --- a/zed/src/channel.rs +++ b/zed/src/channel.rs @@ -1,5 +1,5 @@ use crate::rpc::{self, Client}; -use futures::StreamExt; +use anyhow::Result; use gpui::{Entity, ModelContext, Task, WeakModelHandle}; use std::{ collections::{HashMap, VecDeque}, @@ -22,7 +22,7 @@ pub struct Channel { first_message_id: Option, messages: Option>, rpc: Arc, - _receive_messages: Task<()>, + _message_handler: Task<()>, } pub struct ChannelMessage { @@ -50,28 +50,23 @@ impl Entity for Channel { impl Channel { pub fn new(details: ChannelDetails, rpc: Arc, cx: &mut ModelContext) -> Self { - let mut messages = rpc.subscribe(); - let receive_messages = cx.spawn_weak(|this, mut cx| async move { - while let Some(message) = messages.next().await { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| this.message_received(&message, cx)); - } - } - }); + let _message_handler = rpc.subscribe_from_model(details.id, cx, Self::handle_message_sent); Self { details, rpc, first_message_id: None, messages: None, - _receive_messages: receive_messages, + _message_handler, } } - fn message_received( + fn handle_message_sent( &mut self, message: &TypedEnvelope, + rpc: rpc::Client, cx: &mut ModelContext, - ) { + ) -> Result<()> { + Ok(()) } } diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index c987e34b1657cfb2fe23e0af54b75b7a9792078b..07c6cb6fb1a018cf7ae5f06f12d49cefa2edd197 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -2,14 +2,15 @@ use crate::{language::LanguageRegistry, worktree::Worktree}; use anyhow::{anyhow, Context, Result}; use async_tungstenite::tungstenite::http::Request; use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; -use futures::Stream; -use gpui::{AsyncAppContext, ModelHandle, Task, WeakModelHandle}; +use futures::StreamExt; +use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; use lazy_static::lazy_static; use smol::lock::RwLock; use std::collections::HashMap; use std::time::Duration; use std::{convert::TryFrom, future::Future, sync::Arc}; use surf::Url; +use zrpc::proto::EntityMessage; pub use zrpc::{proto, ConnectionId, PeerId, TypedEnvelope}; use zrpc::{ proto::{EnvelopedMessage, RequestMessage}, @@ -82,8 +83,32 @@ impl Client { }); } - pub fn subscribe(&self) -> impl Stream>> { - self.peer.subscribe() + pub fn subscribe_from_model( + &self, + remote_id: u64, + cx: &mut ModelContext, + mut handler: F, + ) -> Task<()> + where + T: EntityMessage, + M: Entity, + F: 'static + FnMut(&mut M, &TypedEnvelope, Client, &mut ModelContext) -> Result<()>, + { + let rpc = self.clone(); + let mut incoming = self.peer.subscribe::(); + cx.spawn_weak(|model, mut cx| async move { + while let Some(envelope) = incoming.next().await { + if envelope.payload.remote_entity_id() == remote_id { + if let Some(model) = model.upgrade(&cx) { + model.update(&mut cx, |model, cx| { + if let Err(error) = handler(model, &envelope, rpc.clone(), cx) { + log::error!("error handling message: {}", error) + } + }); + } + } + } + }) } pub async fn log_in_and_connect( diff --git a/zed/src/util.rs b/zed/src/util.rs index dbbb1c45987b57e72fcca630dee7f44f48b5c75d..ea9b544f9a3f9961b1efd8f077690f19aece6fdb 100644 --- a/zed/src/util.rs +++ b/zed/src/util.rs @@ -1,3 +1,4 @@ +use futures::Future; use rand::prelude::*; use std::cmp::Ordering; @@ -81,6 +82,17 @@ impl Iterator for RandomCharIter { } } +pub async fn log_async_errors(f: F) -> impl Future +where + F: Future>, +{ + async { + if let Err(error) = f.await { + log::error!("{}", error) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 1bfe2cd1a2e0819bcecc8df974276438dc19c486..a483a06647d2a741e2c56dcd4ccf268d8a831508 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -10,7 +10,7 @@ use crate::{ rpc::{self, proto}, sum_tree::{self, Cursor, Edit, SumTree}, time::{self, ReplicaId}, - util::Bias, + util::{log_async_errors, Bias}, }; use ::ignore::gitignore::Gitignore; use anyhow::{anyhow, Result}; @@ -76,7 +76,10 @@ impl Entity for Worktree { fn release(&mut self, cx: &mut MutableAppContext) { let rpc = match self { - Self::Local(tree) => tree.rpc.clone(), + Self::Local(tree) => tree + .share + .as_ref() + .map(|share| (share.rpc.clone(), share.remote_id)), Self::Remote(tree) => Some((tree.rpc.clone(), tree.remote_id)), }; @@ -226,6 +229,14 @@ impl Worktree { .detach(); } + let _message_handlers = vec![ + rpc.subscribe_from_model(remote_id, cx, Self::handle_add_peer), + rpc.subscribe_from_model(remote_id, cx, Self::handle_remove_peer), + rpc.subscribe_from_model(remote_id, cx, Self::handle_update), + rpc.subscribe_from_model(remote_id, cx, Self::handle_update_buffer), + rpc.subscribe_from_model(remote_id, cx, Self::handle_buffer_saved), + ]; + Worktree::Remote(RemoteWorktree { remote_id, replica_id, @@ -239,6 +250,7 @@ impl Worktree { .map(|p| (PeerId(p.peer_id), p.replica_id as ReplicaId)) .collect(), languages, + _message_handlers, }) }) }); @@ -289,10 +301,11 @@ impl Worktree { } } - pub fn add_peer( + pub fn handle_add_peer( &mut self, - envelope: TypedEnvelope, - cx: &mut ModelContext, + envelope: &TypedEnvelope, + _: rpc::Client, + cx: &mut ModelContext, ) -> Result<()> { match self { Worktree::Local(worktree) => worktree.add_peer(envelope, cx), @@ -300,10 +313,11 @@ impl Worktree { } } - pub fn remove_peer( + pub fn handle_remove_peer( &mut self, - envelope: TypedEnvelope, - cx: &mut ModelContext, + envelope: &TypedEnvelope, + _: rpc::Client, + cx: &mut ModelContext, ) -> Result<()> { match self { Worktree::Local(worktree) => worktree.remove_peer(envelope, cx), @@ -311,6 +325,51 @@ impl Worktree { } } + pub fn handle_update( + &mut self, + envelope: &TypedEnvelope, + _: rpc::Client, + cx: &mut ModelContext, + ) -> anyhow::Result<()> { + self.as_remote_mut() + .unwrap() + .update_from_remote(envelope, cx) + } + + pub fn handle_open_buffer( + &mut self, + envelope: &TypedEnvelope, + rpc: rpc::Client, + cx: &mut ModelContext, + ) -> anyhow::Result<()> { + let receipt = envelope.receipt(); + + let response = self + .as_local_mut() + .unwrap() + .open_remote_buffer(envelope, cx); + + cx.background() + .spawn(log_async_errors(async move { + rpc.respond(receipt, response.await?).await?; + Ok(()) + })) + .detach(); + + Ok(()) + } + + pub fn handle_close_buffer( + &mut self, + envelope: &TypedEnvelope, + _: rpc::Client, + cx: &mut ModelContext, + ) -> anyhow::Result<()> { + self.as_local_mut() + .unwrap() + .close_remote_buffer(envelope, cx) + } + pub fn peers(&self) -> &HashMap { match self { Worktree::Local(worktree) => &worktree.peers, @@ -356,13 +415,15 @@ impl Worktree { .is_some() } - pub fn update_buffer( + pub fn handle_update_buffer( &mut self, - envelope: proto::UpdateBuffer, + envelope: &TypedEnvelope, + _: rpc::Client, cx: &mut ModelContext, ) -> Result<()> { - let buffer_id = envelope.buffer_id as usize; - let ops = envelope + let payload = envelope.payload.clone(); + let buffer_id = payload.buffer_id as usize; + let ops = payload .operations .into_iter() .map(|op| op.try_into()) @@ -401,34 +462,72 @@ impl Worktree { Ok(()) } - pub fn buffer_saved( + pub fn handle_save_buffer( &mut self, - message: proto::BufferSaved, + envelope: &TypedEnvelope, + rpc: rpc::Client, cx: &mut ModelContext, ) -> Result<()> { - if let Worktree::Remote(worktree) = self { - if let Some(buffer) = worktree - .open_buffers - .get(&(message.buffer_id as usize)) - .and_then(|buf| buf.upgrade(cx)) - { - buffer.update(cx, |buffer, cx| { - let version = message.version.try_into()?; - let mtime = message - .mtime - .ok_or_else(|| anyhow!("missing mtime"))? - .into(); - buffer.did_save(version, mtime, cx); - Result::<_, anyhow::Error>::Ok(()) - })?; - } - Ok(()) - } else { - Err(anyhow!( - "invalid buffer {} in buffer saved message", - message.buffer_id - )) + let sender_id = envelope.original_sender_id()?; + let buffer = self + .as_local() + .unwrap() + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + + let receipt = envelope.receipt(); + let worktree_id = envelope.payload.worktree_id; + let buffer_id = envelope.payload.buffer_id; + let save = buffer.update(cx, |buffer, cx| buffer.save(cx))?; + + cx.background() + .spawn(log_async_errors(async move { + let (version, mtime) = save.await?; + + rpc.respond( + receipt, + proto::BufferSaved { + worktree_id, + buffer_id, + version: (&version).into(), + mtime: Some(mtime.into()), + }, + ) + .await?; + + Ok(()) + })) + .detach(); + + Ok(()) + } + + pub fn handle_buffer_saved( + &mut self, + envelope: &TypedEnvelope, + _: rpc::Client, + cx: &mut ModelContext, + ) -> Result<()> { + let payload = envelope.payload.clone(); + let worktree = self.as_remote_mut().unwrap(); + if let Some(buffer) = worktree + .open_buffers + .get(&(payload.buffer_id as usize)) + .and_then(|buf| buf.upgrade(cx)) + { + buffer.update(cx, |buffer, cx| { + let version = payload.version.try_into()?; + let mtime = payload + .mtime + .ok_or_else(|| anyhow!("missing mtime"))? + .into(); + buffer.did_save(version, mtime, cx); + Result::<_, anyhow::Error>::Ok(()) + })?; } + Ok(()) } fn poll_snapshot(&mut self, cx: &mut ModelContext) { @@ -561,11 +660,10 @@ impl Deref for Worktree { pub struct LocalWorktree { snapshot: Snapshot, background_snapshot: Arc>, - snapshots_to_send_tx: Option>, last_scan_state_rx: watch::Receiver, _background_scanner_task: Option>, poll_task: Option>, - rpc: Option<(rpc::Client, u64)>, + share: Option, open_buffers: HashMap>, shared_buffers: HashMap>>, peers: HashMap, @@ -619,14 +717,13 @@ impl LocalWorktree { let tree = Self { snapshot: snapshot.clone(), background_snapshot: Arc::new(Mutex::new(snapshot)), - snapshots_to_send_tx: None, last_scan_state_rx, _background_scanner_task: None, + share: None, poll_task: None, open_buffers: Default::default(), shared_buffers: Default::default(), peers: Default::default(), - rpc: None, languages, fs, }; @@ -639,10 +736,8 @@ impl LocalWorktree { this.poll_snapshot(cx); let tree = this.as_local_mut().unwrap(); if !tree.is_scanning() { - if let Some(snapshots_to_send_tx) = - tree.snapshots_to_send_tx.clone() - { - Some((tree.snapshot(), snapshots_to_send_tx)) + if let Some(share) = tree.share.as_ref() { + Some((tree.snapshot(), share.snapshots_tx.clone())) } else { None } @@ -717,7 +812,7 @@ impl LocalWorktree { pub fn open_remote_buffer( &mut self, - envelope: TypedEnvelope, + envelope: &TypedEnvelope, cx: &mut ModelContext, ) -> Task> { let peer_id = envelope.original_sender_id(); @@ -744,7 +839,7 @@ impl LocalWorktree { pub fn close_remote_buffer( &mut self, - envelope: TypedEnvelope, + envelope: &TypedEnvelope, _: &mut ModelContext, ) -> Result<()> { if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) { @@ -756,19 +851,24 @@ impl LocalWorktree { pub fn add_peer( &mut self, - envelope: TypedEnvelope, + envelope: &TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { - let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?; + let peer = envelope + .payload + .peer + .as_ref() + .ok_or_else(|| anyhow!("empty peer"))?; self.peers .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId); cx.notify(); + Ok(()) } pub fn remove_peer( &mut self, - envelope: TypedEnvelope, + envelope: &TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { let peer_id = PeerId(envelope.payload.peer_id); @@ -783,6 +883,7 @@ impl LocalWorktree { } } cx.notify(); + Ok(()) } @@ -892,6 +993,7 @@ impl LocalWorktree { cx.spawn(|this, mut cx| async move { let share_request = share_request.await; let share_response = rpc.request(share_request).await?; + let remote_id = share_response.worktree_id; rpc.state .write() @@ -906,11 +1008,10 @@ impl LocalWorktree { cx.background() .spawn({ let rpc = rpc.clone(); - let worktree_id = share_response.worktree_id; async move { let mut prev_snapshot = snapshot; while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = snapshot.build_update(&prev_snapshot, worktree_id); + let message = snapshot.build_update(&prev_snapshot, remote_id); match rpc.send(message).await { Ok(()) => prev_snapshot = snapshot, Err(err) => log::error!("error sending snapshot diff {}", err), @@ -920,13 +1021,26 @@ impl LocalWorktree { }) .detach(); - this.update(&mut cx, |worktree, _| { + this.update(&mut cx, |worktree, cx| { + let _message_handlers = vec![ + rpc.subscribe_from_model(remote_id, cx, Worktree::handle_add_peer), + rpc.subscribe_from_model(remote_id, cx, Worktree::handle_remove_peer), + rpc.subscribe_from_model(remote_id, cx, Worktree::handle_open_buffer), + rpc.subscribe_from_model(remote_id, cx, Worktree::handle_close_buffer), + rpc.subscribe_from_model(remote_id, cx, Worktree::handle_update_buffer), + rpc.subscribe_from_model(remote_id, cx, Worktree::handle_save_buffer), + ]; + let worktree = worktree.as_local_mut().unwrap(); - worktree.rpc = Some((rpc, share_response.worktree_id)); - worktree.snapshots_to_send_tx = Some(snapshots_to_send_tx); + worktree.share = Some(ShareState { + rpc, + remote_id: share_response.worktree_id, + snapshots_tx: snapshots_to_send_tx, + _message_handlers, + }); }); - Ok((share_response.worktree_id, share_response.access_token)) + Ok((remote_id, share_response.access_token)) }) } @@ -978,6 +1092,13 @@ impl fmt::Debug for LocalWorktree { } } +struct ShareState { + rpc: rpc::Client, + remote_id: u64, + snapshots_tx: Sender, + _message_handlers: Vec>, +} + pub struct RemoteWorktree { remote_id: u64, snapshot: Snapshot, @@ -988,6 +1109,7 @@ pub struct RemoteWorktree { open_buffers: HashMap, peers: HashMap, languages: Arc, + _message_handlers: Vec>, } impl RemoteWorktree { @@ -1055,12 +1177,32 @@ impl RemoteWorktree { self.snapshot.clone() } + fn update_from_remote( + &mut self, + envelope: &TypedEnvelope, + cx: &mut ModelContext, + ) -> Result<()> { + let mut tx = self.updates_tx.clone(); + let payload = envelope.payload.clone(); + cx.background() + .spawn(async move { + tx.send(payload).await.expect("receiver runs to completion"); + }) + .detach(); + + Ok(()) + } + pub fn add_peer( &mut self, - envelope: TypedEnvelope, + envelope: &TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { - let peer = envelope.payload.peer.ok_or_else(|| anyhow!("empty peer"))?; + let peer = envelope + .payload + .peer + .as_ref() + .ok_or_else(|| anyhow!("empty peer"))?; self.peers .insert(PeerId(peer.peer_id), peer.replica_id as ReplicaId); cx.notify(); @@ -1069,7 +1211,7 @@ impl RemoteWorktree { pub fn remove_peer( &mut self, - envelope: TypedEnvelope, + envelope: &TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { let peer_id = PeerId(envelope.payload.peer_id); @@ -1420,7 +1562,10 @@ impl File { pub fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) { self.worktree.update(cx, |worktree, cx| { if let Some((rpc, remote_id)) = match worktree { - Worktree::Local(worktree) => worktree.rpc.clone(), + Worktree::Local(worktree) => worktree + .share + .as_ref() + .map(|share| (share.rpc.clone(), share.remote_id)), Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)), } { cx.spawn(|_, _| async move { @@ -1497,9 +1642,12 @@ impl File { ) -> Task> { self.worktree.update(cx, |worktree, cx| match worktree { Worktree::Local(worktree) => { - let rpc = worktree.rpc.clone(); + let rpc = worktree + .share + .as_ref() + .map(|share| (share.rpc.clone(), share.remote_id)); let save = worktree.save(self.path.clone(), text, cx); - cx.spawn(|_, _| async move { + cx.background().spawn(async move { let entry = save.await?; if let Some((rpc, worktree_id)) = rpc { rpc.send(proto::BufferSaved { @@ -1516,7 +1664,7 @@ impl File { Worktree::Remote(worktree) => { let rpc = worktree.rpc.clone(); let worktree_id = worktree.remote_id; - cx.spawn(|_, _| async move { + cx.background().spawn(async move { let response = rpc .request(proto::SaveBuffer { worktree_id, @@ -2395,7 +2543,9 @@ mod remote { .read() .await .shared_worktree(envelope.payload.worktree_id, cx)? - .update(cx, |worktree, cx| worktree.add_peer(envelope, cx)) + .update(cx, |worktree, cx| { + worktree.handle_add_peer(&envelope, rpc.clone(), cx) + }) } pub async fn remove_peer( @@ -2407,7 +2557,9 @@ mod remote { .read() .await .shared_worktree(envelope.payload.worktree_id, cx)? - .update(cx, |worktree, cx| worktree.remove_peer(envelope, cx)) + .update(cx, |worktree, cx| { + worktree.handle_remove_peer(&envelope, rpc.clone(), cx) + }) } pub async fn update_worktree( @@ -2456,7 +2608,7 @@ mod remote { worktree .as_local_mut() .unwrap() - .open_remote_buffer(envelope, cx) + .open_remote_buffer(&envelope, cx) }) .await?; @@ -2480,7 +2632,7 @@ mod remote { worktree .as_local_mut() .unwrap() - .close_remote_buffer(envelope, cx) + .close_remote_buffer(&envelope, cx) }) } @@ -2489,12 +2641,13 @@ mod remote { rpc: &rpc::Client, cx: &mut AsyncAppContext, ) -> anyhow::Result<()> { - let message = envelope.payload; rpc.state .read() .await - .shared_worktree(message.worktree_id, cx)? - .update(cx, |tree, cx| tree.update_buffer(message, cx))?; + .shared_worktree(envelope.payload.worktree_id, cx)? + .update(cx, |tree, cx| { + tree.handle_update_buffer(&envelope, rpc.clone(), cx) + })?; Ok(()) } @@ -2538,7 +2691,7 @@ mod remote { .await .shared_worktree(envelope.payload.worktree_id, cx)? .update(cx, |worktree, cx| { - worktree.buffer_saved(envelope.payload, cx) + worktree.handle_buffer_saved(&envelope, rpc.clone(), cx) })?; Ok(()) } diff --git a/zrpc/src/peer.rs b/zrpc/src/peer.rs index ef3dd1d9564c31f067a327980798af642ff60cbc..c377ad1309ccc0e6eae58457769ecaa49c520a54 100644 --- a/zrpc/src/peer.rs +++ b/zrpc/src/peer.rs @@ -171,11 +171,10 @@ impl Peer { } } else { router.handle(connection_id, envelope.clone()).await; - match proto::build_typed_envelope(connection_id, envelope) { - Ok(envelope) => { - broadcast_incoming_messages.send(envelope).await.ok(); - } - Err(error) => log::error!("{}", error), + if let Some(envelope) = proto::build_typed_envelope(connection_id, envelope) { + broadcast_incoming_messages.send(envelope).await.ok(); + } else { + log::error!("unable to construct a typed envelope"); } } } diff --git a/zrpc/src/proto.rs b/zrpc/src/proto.rs index 7d4458f1d713796c26150d20052d4da2a793a0d3..1c799ebe21d544af919c30beb60d20fa2a15c291 100644 --- a/zrpc/src/proto.rs +++ b/zrpc/src/proto.rs @@ -1,5 +1,5 @@ use super::{ConnectionId, PeerId, TypedEnvelope}; -use anyhow::{anyhow, Result}; +use anyhow::Result; use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; use futures::{SinkExt as _, StreamExt as _}; use prost::Message; @@ -24,127 +24,133 @@ pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static { fn from_envelope(envelope: Envelope) -> Option; } +pub trait EntityMessage: EnvelopedMessage { + fn remote_entity_id(&self) -> u64; +} + pub trait RequestMessage: EnvelopedMessage { type Response: EnvelopedMessage; } macro_rules! messages { - ($($name:ident),*) => { - fn unicast_message_into_typed_envelope(sender_id: ConnectionId, envelope: &mut Envelope) -> Option> { - match &mut envelope.payload { - $(payload @ Some(envelope::Payload::$name(_)) => Some(Arc::new(TypedEnvelope { + ($($name:ident),* $(,)?) => { + pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option> { + match envelope.payload { + $(Some(envelope::Payload::$name(payload)) => Some(Arc::new(TypedEnvelope { sender_id, original_sender_id: envelope.original_sender_id.map(PeerId), message_id: envelope.id, - payload: payload.take().unwrap(), + payload, })), )* _ => None } } $( - message!($name); + impl EnvelopedMessage for $name { + const NAME: &'static str = std::stringify!($name); + + fn into_envelope( + self, + id: u32, + responding_to: Option, + original_sender_id: Option, + ) -> Envelope { + Envelope { + id, + responding_to, + original_sender_id, + payload: Some(envelope::Payload::$name(self)), + } + } + + fn matches_envelope(envelope: &Envelope) -> bool { + matches!(&envelope.payload, Some(envelope::Payload::$name(_))) + } + + fn from_envelope(envelope: Envelope) -> Option { + if let Some(envelope::Payload::$name(msg)) = envelope.payload { + Some(msg) + } else { + None + } + } + } )* }; } macro_rules! request_messages { - ($(($request_name:ident, $response_name:ident)),*) => { - fn request_message_into_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option> { - match envelope.payload { - $( - Some(envelope::Payload::$request_name(payload)) => Some(Arc::new(TypedEnvelope { - sender_id, - original_sender_id: envelope.original_sender_id.map(PeerId), - message_id: envelope.id, - payload, - })), - Some(envelope::Payload::$response_name(payload)) => Some(Arc::new(TypedEnvelope { - sender_id, - original_sender_id: envelope.original_sender_id.map(PeerId), - message_id: envelope.id, - payload, - })), - )* - _ => None - } - } - - $( - message!($request_name); - message!($response_name); - )* - + ($(($request_name:ident, $response_name:ident)),* $(,)?) => { $(impl RequestMessage for $request_name { type Response = $response_name; })* }; } -macro_rules! message { - ($name:ident) => { - impl EnvelopedMessage for $name { - const NAME: &'static str = std::stringify!($name); - - fn into_envelope( - self, - id: u32, - responding_to: Option, - original_sender_id: Option, - ) -> Envelope { - Envelope { - id, - responding_to, - original_sender_id, - payload: Some(envelope::Payload::$name(self)), - } - } - - fn matches_envelope(envelope: &Envelope) -> bool { - matches!(&envelope.payload, Some(envelope::Payload::$name(_))) - } - - fn from_envelope(envelope: Envelope) -> Option { - if let Some(envelope::Payload::$name(msg)) = envelope.payload { - Some(msg) - } else { - None - } +macro_rules! entity_messages { + ($id_field:ident, $($name:ident),* $(,)?) => { + $(impl EntityMessage for $name { + fn remote_entity_id(&self) -> u64 { + self.$id_field } - } + })* }; } messages!( - UpdateWorktree, - CloseWorktree, - CloseBuffer, - UpdateBuffer, AddPeer, + Auth, + AuthResponse, + BufferSaved, + ChannelMessageSent, + CloseBuffer, + CloseWorktree, + GetChannels, + GetChannelsResponse, + GetUsers, + GetUsersResponse, + JoinChannel, + JoinChannelResponse, + OpenBuffer, + OpenBufferResponse, + OpenWorktree, + OpenWorktreeResponse, RemovePeer, + SaveBuffer, SendChannelMessage, - ChannelMessageSent + ShareWorktree, + ShareWorktreeResponse, + UpdateBuffer, + UpdateWorktree, ); request_messages!( (Auth, AuthResponse), - (ShareWorktree, ShareWorktreeResponse), - (OpenWorktree, OpenWorktreeResponse), - (OpenBuffer, OpenBufferResponse), - (SaveBuffer, BufferSaved), (GetChannels, GetChannelsResponse), + (GetUsers, GetUsersResponse), (JoinChannel, JoinChannelResponse), - (GetUsers, GetUsersResponse) + (OpenBuffer, OpenBufferResponse), + (OpenWorktree, OpenWorktreeResponse), + (SaveBuffer, BufferSaved), + (ShareWorktree, ShareWorktreeResponse), ); -pub fn build_typed_envelope( - sender_id: ConnectionId, - mut envelope: Envelope, -) -> Result> { - unicast_message_into_typed_envelope(sender_id, &mut envelope) - .or_else(|| request_message_into_typed_envelope(sender_id, envelope)) - .ok_or_else(|| anyhow!("unrecognized payload type")) -} +entity_messages!( + worktree_id, + AddPeer, + BufferSaved, + CloseBuffer, + CloseWorktree, + OpenBuffer, + OpenWorktree, + RemovePeer, + SaveBuffer, + UpdateBuffer, + UpdateWorktree, +); + +entity_messages!(channel_id, ChannelMessageSent); /// A stream of protobuf messages. pub struct MessageStream {