Merge branch 'zed2-hangs' into zed2

Max Brunsfeld created

Change summary

Cargo.lock                                   |   38 
Cargo.toml                                   |    1 
crates/client2/Cargo.toml                    |    6 
crates/client2/src/client2.rs                |    8 
crates/client2/src/user.rs                   |    2 
crates/gpui2/src/platform/test/dispatcher.rs |   19 
crates/language2/Cargo.toml                  |    3 
crates/language2/src/buffer.rs               |    4 
crates/language2/src/language2.rs            |  217 +-
crates/language2/src/proto.rs                |    2 
crates/project2/Cargo.toml                   |    4 
crates/project2/src/worktree.rs              |    6 
crates/rpc2/Cargo.toml                       |   44 
crates/rpc2/build.rs                         |    8 
crates/rpc2/proto/zed.proto                  | 1559 ++++++++++++++++++++++
crates/rpc2/src/auth.rs                      |  136 +
crates/rpc2/src/conn.rs                      |  108 +
crates/rpc2/src/macros.rs                    |   70 
crates/rpc2/src/peer.rs                      |  933 +++++++++++++
crates/rpc2/src/proto.rs                     |  674 +++++++++
crates/rpc2/src/rpc.rs                       |    9 
crates/zed2/Cargo.toml                       |    2 
22 files changed, 3,715 insertions(+), 138 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1507,7 +1507,7 @@ dependencies = [
  "parking_lot 0.11.2",
  "postage",
  "rand 0.8.5",
- "rpc",
+ "rpc2",
  "schemars",
  "serde",
  "serde_derive",
@@ -4285,7 +4285,6 @@ dependencies = [
  "collections",
  "ctor",
  "env_logger 0.9.3",
- "fs",
  "futures 0.3.28",
  "fuzzy2",
  "git",
@@ -4299,7 +4298,7 @@ dependencies = [
  "postage",
  "rand 0.8.5",
  "regex",
- "rpc",
+ "rpc2",
  "schemars",
  "serde",
  "serde_derive",
@@ -6087,7 +6086,7 @@ dependencies = [
  "pretty_assertions",
  "rand 0.8.5",
  "regex",
- "rpc",
+ "rpc2",
  "schemars",
  "serde",
  "serde_derive",
@@ -6839,6 +6838,35 @@ dependencies = [
  "zstd",
 ]
 
+[[package]]
+name = "rpc2"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "async-lock",
+ "async-tungstenite",
+ "base64 0.13.1",
+ "clock",
+ "collections",
+ "ctor",
+ "env_logger 0.9.3",
+ "futures 0.3.28",
+ "gpui2",
+ "parking_lot 0.11.2",
+ "prost 0.8.0",
+ "prost-build",
+ "rand 0.8.5",
+ "rsa 0.4.0",
+ "serde",
+ "serde_derive",
+ "smol",
+ "smol-timeout",
+ "tempdir",
+ "tracing",
+ "util",
+ "zstd",
+]
+
 [[package]]
 name = "rsa"
 version = "0.4.0"
@@ -10806,7 +10834,7 @@ dependencies = [
  "project2",
  "rand 0.8.5",
  "regex",
- "rpc",
+ "rpc2",
  "rsa 0.4.0",
  "rust-embed",
  "schemars",

Cargo.toml 🔗

@@ -73,6 +73,7 @@ members = [
     "crates/recent_projects",
     "crates/rope",
     "crates/rpc",
+    "crates/rpc2",
     "crates/search",
     "crates/settings",
     "crates/settings2",

crates/client2/Cargo.toml 🔗

@@ -9,14 +9,14 @@ path = "src/client2.rs"
 doctest = false
 
 [features]
-test-support = ["collections/test-support", "gpui2/test-support", "rpc/test-support"]
+test-support = ["collections/test-support", "gpui2/test-support", "rpc2/test-support"]
 
 [dependencies]
 collections = { path = "../collections" }
 db2 = { path = "../db2" }
 gpui2 = { path = "../gpui2" }
 util = { path = "../util" }
-rpc = { path = "../rpc" }
+rpc2 = { path = "../rpc2" }
 text = { path = "../text" }
 settings2 = { path = "../settings2" }
 feature_flags2 = { path = "../feature_flags2" }
@@ -47,6 +47,6 @@ url = "2.2"
 [dev-dependencies]
 collections = { path = "../collections", features = ["test-support"] }
 gpui2 = { path = "../gpui2", features = ["test-support"] }
-rpc = { path = "../rpc", features = ["test-support"] }
+rpc2 = { path = "../rpc2", features = ["test-support"] }
 settings = { path = "../settings", features = ["test-support"] }
 util = { path = "../util", features = ["test-support"] }

crates/client2/src/client2.rs 🔗

@@ -21,7 +21,7 @@ use lazy_static::lazy_static;
 use parking_lot::RwLock;
 use postage::watch;
 use rand::prelude::*;
-use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage};
+use rpc2::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage};
 use schemars::JsonSchema;
 use serde::{Deserialize, Serialize};
 use settings2::Settings;
@@ -43,7 +43,7 @@ use util::channel::ReleaseChannel;
 use util::http::HttpClient;
 use util::{ResultExt, TryFutureExt};
 
-pub use rpc::*;
+pub use rpc2::*;
 pub use telemetry::ClickhouseEvent;
 pub use user::*;
 
@@ -975,7 +975,7 @@ impl Client {
                 "Authorization",
                 format!("{} {}", credentials.user_id, credentials.access_token),
             )
-            .header("x-zed-protocol-version", rpc::PROTOCOL_VERSION);
+            .header("x-zed-protocol-version", rpc2::PROTOCOL_VERSION);
 
         let http = self.http.clone();
         cx.executor().spawn(async move {
@@ -1025,7 +1025,7 @@ impl Client {
             // zed server to encrypt the user's access token, so that it can'be intercepted by
             // any other app running on the user's device.
             let (public_key, private_key) =
-                rpc::auth::keypair().expect("failed to generate keypair for auth");
+                rpc2::auth::keypair().expect("failed to generate keypair for auth");
             let public_key_string =
                 String::try_from(public_key).expect("failed to serialize public key for auth");
 

crates/client2/src/user.rs 🔗

@@ -5,7 +5,7 @@ use feature_flags2::FeatureFlagAppExt;
 use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt};
 use gpui2::{AsyncAppContext, EventEmitter, Handle, ImageData, ModelContext, Task};
 use postage::{sink::Sink, watch};
-use rpc::proto::{RequestMessage, UsersResponse};
+use rpc2::proto::{RequestMessage, UsersResponse};
 use std::sync::{Arc, Weak};
 use text::ReplicaId;
 use util::http::HttpClient;

crates/gpui2/src/platform/test/dispatcher.rs 🔗

@@ -1,6 +1,6 @@
 use crate::PlatformDispatcher;
 use async_task::Runnable;
-use collections::{BTreeMap, HashMap, VecDeque};
+use collections::{HashMap, VecDeque};
 use parking_lot::Mutex;
 use rand::prelude::*;
 use std::{
@@ -24,7 +24,7 @@ struct TestDispatcherState {
     random: StdRng,
     foreground: HashMap<TestDispatcherId, VecDeque<Runnable>>,
     background: Vec<Runnable>,
-    delayed: BTreeMap<Instant, Runnable>,
+    delayed: Vec<(Instant, Runnable)>,
     time: Instant,
     is_main_thread: bool,
     next_id: TestDispatcherId,
@@ -36,7 +36,7 @@ impl TestDispatcher {
             random,
             foreground: HashMap::default(),
             background: Vec::new(),
-            delayed: BTreeMap::new(),
+            delayed: Vec::new(),
             time: Instant::now(),
             is_main_thread: true,
             next_id: TestDispatcherId(1),
@@ -112,17 +112,20 @@ impl PlatformDispatcher for TestDispatcher {
     fn dispatch_after(&self, duration: std::time::Duration, runnable: Runnable) {
         let mut state = self.state.lock();
         let next_time = state.time + duration;
-        state.delayed.insert(next_time, runnable);
+        let ix = match state.delayed.binary_search_by_key(&next_time, |e| e.0) {
+            Ok(ix) | Err(ix) => ix,
+        };
+        state.delayed.insert(ix, (next_time, runnable));
     }
 
     fn poll(&self) -> bool {
         let mut state = self.state.lock();
 
-        while let Some((deadline, _)) = state.delayed.first_key_value() {
+        while let Some((deadline, _)) = state.delayed.first() {
             if *deadline > state.time {
                 break;
             }
-            let (_, runnable) = state.delayed.pop_first().unwrap();
+            let (_, runnable) = state.delayed.remove(0);
             state.background.push(runnable);
         }
 
@@ -134,8 +137,10 @@ impl PlatformDispatcher for TestDispatcher {
         let background_len = state.background.len();
 
         if foreground_len == 0 && background_len == 0 {
+            eprintln!("no runnables to poll");
             return false;
         }
+        eprintln!("runnables {} {}", foreground_len, background_len);
 
         let main_thread = state.random.gen_ratio(
             foreground_len as u32,
@@ -145,6 +150,7 @@ impl PlatformDispatcher for TestDispatcher {
         state.is_main_thread = main_thread;
 
         let runnable = if main_thread {
+            eprintln!("running next main thread");
             let state = &mut *state;
             let runnables = state
                 .foreground
@@ -155,6 +161,7 @@ impl PlatformDispatcher for TestDispatcher {
             runnables.pop_front().unwrap()
         } else {
             let ix = state.random.gen_range(0..background_len);
+            eprintln!("running background thread {ix}");
             state.background.swap_remove(ix)
         };
 

crates/language2/Cargo.toml 🔗

@@ -25,11 +25,10 @@ test-support = [
 clock = { path = "../clock" }
 collections = { path = "../collections" }
 fuzzy2 = { path = "../fuzzy2" }
-fs = { path = "../fs" }
 git = { path = "../git" }
 gpui2 = { path = "../gpui2" }
 lsp2 = { path = "../lsp2" }
-rpc = { path = "../rpc" }
+rpc2 = { path = "../rpc2" }
 settings2 = { path = "../settings2" }
 sum_tree = { path = "../sum_tree" }
 text = { path = "../text" }

crates/language2/src/buffer.rs 🔗

@@ -226,7 +226,7 @@ pub trait File: Send + Sync {
 
     fn as_any(&self) -> &dyn Any;
 
-    fn to_proto(&self) -> rpc::proto::File;
+    fn to_proto(&self) -> rpc2::proto::File;
 }
 
 pub trait LocalFile: File {
@@ -375,7 +375,7 @@ impl Buffer {
             file,
         );
         this.text.set_line_ending(proto::deserialize_line_ending(
-            rpc::proto::LineEnding::from_i32(message.line_ending)
+            rpc2::proto::LineEnding::from_i32(message.line_ending)
                 .ok_or_else(|| anyhow!("missing line_ending"))?,
         ));
         this.saved_version = proto::deserialize_version(&message.saved_version);

crates/language2/src/language2.rs 🔗

@@ -1862,111 +1862,112 @@ pub fn range_from_lsp(range: lsp2::Range) -> Range<Unclipped<PointUtf16>> {
     start..end
 }
 
-// #[cfg(test)]
-// mod tests {
-//     use super::*;
-//     use gpui::TestAppContext;
-
-//     #[gpui::test(iterations = 10)]
-//     async fn test_first_line_pattern(cx: &mut TestAppContext) {
-//         let mut languages = LanguageRegistry::test();
-//         languages.set_executor(cx.background());
-//         let languages = Arc::new(languages);
-//         languages.register(
-//             "/javascript",
-//             LanguageConfig {
-//                 name: "JavaScript".into(),
-//                 path_suffixes: vec!["js".into()],
-//                 first_line_pattern: Some(Regex::new(r"\bnode\b").unwrap()),
-//                 ..Default::default()
-//             },
-//             tree_sitter_typescript::language_tsx(),
-//             vec![],
-//             |_| Default::default(),
-//         );
-
-//         languages
-//             .language_for_file("the/script", None)
-//             .await
-//             .unwrap_err();
-//         languages
-//             .language_for_file("the/script", Some(&"nothing".into()))
-//             .await
-//             .unwrap_err();
-//         assert_eq!(
-//             languages
-//                 .language_for_file("the/script", Some(&"#!/bin/env node".into()))
-//                 .await
-//                 .unwrap()
-//                 .name()
-//                 .as_ref(),
-//             "JavaScript"
-//         );
-//     }
-
-//     #[gpui::test(iterations = 10)]
-//     async fn test_language_loading(cx: &mut TestAppContext) {
-//         let mut languages = LanguageRegistry::test();
-//         languages.set_executor(cx.background());
-//         let languages = Arc::new(languages);
-//         languages.register(
-//             "/JSON",
-//             LanguageConfig {
-//                 name: "JSON".into(),
-//                 path_suffixes: vec!["json".into()],
-//                 ..Default::default()
-//             },
-//             tree_sitter_json::language(),
-//             vec![],
-//             |_| Default::default(),
-//         );
-//         languages.register(
-//             "/rust",
-//             LanguageConfig {
-//                 name: "Rust".into(),
-//                 path_suffixes: vec!["rs".into()],
-//                 ..Default::default()
-//             },
-//             tree_sitter_rust::language(),
-//             vec![],
-//             |_| Default::default(),
-//         );
-//         assert_eq!(
-//             languages.language_names(),
-//             &[
-//                 "JSON".to_string(),
-//                 "Plain Text".to_string(),
-//                 "Rust".to_string(),
-//             ]
-//         );
-
-//         let rust1 = languages.language_for_name("Rust");
-//         let rust2 = languages.language_for_name("Rust");
-
-//         // Ensure language is still listed even if it's being loaded.
-//         assert_eq!(
-//             languages.language_names(),
-//             &[
-//                 "JSON".to_string(),
-//                 "Plain Text".to_string(),
-//                 "Rust".to_string(),
-//             ]
-//         );
-
-//         let (rust1, rust2) = futures::join!(rust1, rust2);
-//         assert!(Arc::ptr_eq(&rust1.unwrap(), &rust2.unwrap()));
-
-//         // Ensure language is still listed even after loading it.
-//         assert_eq!(
-//             languages.language_names(),
-//             &[
-//                 "JSON".to_string(),
-//                 "Plain Text".to_string(),
-//                 "Rust".to_string(),
-//             ]
-//         );
-
-//         // Loading an unknown language returns an error.
-//         assert!(languages.language_for_name("Unknown").await.is_err());
-//     }
-// }
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use gpui2::TestAppContext;
+
+    #[gpui2::test(iterations = 10)]
+    async fn test_first_line_pattern(cx: &mut TestAppContext) {
+        let mut languages = LanguageRegistry::test();
+
+        languages.set_executor(cx.executor().clone());
+        let languages = Arc::new(languages);
+        languages.register(
+            "/javascript",
+            LanguageConfig {
+                name: "JavaScript".into(),
+                path_suffixes: vec!["js".into()],
+                first_line_pattern: Some(Regex::new(r"\bnode\b").unwrap()),
+                ..Default::default()
+            },
+            tree_sitter_typescript::language_tsx(),
+            vec![],
+            |_| Default::default(),
+        );
+
+        languages
+            .language_for_file("the/script", None)
+            .await
+            .unwrap_err();
+        languages
+            .language_for_file("the/script", Some(&"nothing".into()))
+            .await
+            .unwrap_err();
+        assert_eq!(
+            languages
+                .language_for_file("the/script", Some(&"#!/bin/env node".into()))
+                .await
+                .unwrap()
+                .name()
+                .as_ref(),
+            "JavaScript"
+        );
+    }
+
+    #[gpui2::test(iterations = 10)]
+    async fn test_language_loading(cx: &mut TestAppContext) {
+        let mut languages = LanguageRegistry::test();
+        languages.set_executor(cx.executor().clone());
+        let languages = Arc::new(languages);
+        languages.register(
+            "/JSON",
+            LanguageConfig {
+                name: "JSON".into(),
+                path_suffixes: vec!["json".into()],
+                ..Default::default()
+            },
+            tree_sitter_json::language(),
+            vec![],
+            |_| Default::default(),
+        );
+        languages.register(
+            "/rust",
+            LanguageConfig {
+                name: "Rust".into(),
+                path_suffixes: vec!["rs".into()],
+                ..Default::default()
+            },
+            tree_sitter_rust::language(),
+            vec![],
+            |_| Default::default(),
+        );
+        assert_eq!(
+            languages.language_names(),
+            &[
+                "JSON".to_string(),
+                "Plain Text".to_string(),
+                "Rust".to_string(),
+            ]
+        );
+
+        let rust1 = languages.language_for_name("Rust");
+        let rust2 = languages.language_for_name("Rust");
+
+        // Ensure language is still listed even if it's being loaded.
+        assert_eq!(
+            languages.language_names(),
+            &[
+                "JSON".to_string(),
+                "Plain Text".to_string(),
+                "Rust".to_string(),
+            ]
+        );
+
+        let (rust1, rust2) = futures::join!(rust1, rust2);
+        assert!(Arc::ptr_eq(&rust1.unwrap(), &rust2.unwrap()));
+
+        // Ensure language is still listed even after loading it.
+        assert_eq!(
+            languages.language_names(),
+            &[
+                "JSON".to_string(),
+                "Plain Text".to_string(),
+                "Rust".to_string(),
+            ]
+        );
+
+        // Loading an unknown language returns an error.
+        assert!(languages.language_for_name("Unknown").await.is_err());
+    }
+}

crates/language2/src/proto.rs 🔗

@@ -5,7 +5,7 @@ use crate::{
 use anyhow::{anyhow, Result};
 use clock::ReplicaId;
 use lsp2::{DiagnosticSeverity, LanguageServerId};
-use rpc::proto;
+use rpc2::proto;
 use std::{ops::Range, sync::Arc};
 use text::*;
 

crates/project2/Cargo.toml 🔗

@@ -34,7 +34,7 @@ language2 = { path = "../language2" }
 lsp2 = { path = "../lsp2" }
 node_runtime = { path = "../node_runtime" }
 prettier2 = { path = "../prettier2" }
-rpc = { path = "../rpc" }
+rpc2 = { path = "../rpc2" }
 settings2 = { path = "../settings2" }
 sum_tree = { path = "../sum_tree" }
 terminal2 = { path = "../terminal2" }
@@ -78,7 +78,7 @@ lsp2 = { path = "../lsp2", features = ["test-support"] }
 settings2 = { path = "../settings2", features = ["test-support"] }
 prettier2 = { path = "../prettier2", features = ["test-support"] }
 util = { path = "../util", features = ["test-support"] }
-rpc = { path = "../rpc", features = ["test-support"] }
+rpc2 = { path = "../rpc2", features = ["test-support"] }
 git2.workspace = true
 tempdir.workspace = true
 unindent.workspace = true

crates/project2/src/worktree.rs 🔗

@@ -2646,8 +2646,8 @@ impl language2::File for File {
         self
     }
 
-    fn to_proto(&self) -> rpc::proto::File {
-        rpc::proto::File {
+    fn to_proto(&self) -> rpc2::proto::File {
+        rpc2::proto::File {
             worktree_id: self.worktree.entity_id().as_u64(),
             entry_id: self.entry_id.to_proto(),
             path: self.path.to_string_lossy().into(),
@@ -2713,7 +2713,7 @@ impl File {
     }
 
     pub fn from_proto(
-        proto: rpc::proto::File,
+        proto: rpc2::proto::File,
         worktree: Handle<Worktree>,
         cx: &AppContext,
     ) -> Result<Self> {

crates/rpc2/Cargo.toml 🔗

@@ -0,0 +1,44 @@
+[package]
+description = "Shared logic for communication between the Zed app and the zed.dev server"
+edition = "2021"
+name = "rpc2"
+version = "0.1.0"
+publish = false
+
+[lib]
+path = "src/rpc.rs"
+doctest = false
+
+[features]
+test-support = ["collections/test-support", "gpui2/test-support"]
+
+[dependencies]
+clock = { path = "../clock" }
+collections = { path = "../collections" }
+gpui2 = { path = "../gpui2", optional = true }
+util = { path = "../util" }
+anyhow.workspace = true
+async-lock = "2.4"
+async-tungstenite = "0.16"
+base64 = "0.13"
+futures.workspace = true
+parking_lot.workspace = true
+prost.workspace = true
+rand.workspace = true
+rsa = "0.4"
+serde.workspace = true
+serde_derive.workspace = true
+smol-timeout = "0.6"
+tracing = { version = "0.1.34", features = ["log"] }
+zstd = "0.11"
+
+[build-dependencies]
+prost-build = "0.9"
+
+[dev-dependencies]
+collections = { path = "../collections", features = ["test-support"] }
+gpui2 = { path = "../gpui2", features = ["test-support"] }
+smol.workspace = true
+tempdir.workspace = true
+ctor.workspace = true
+env_logger.workspace = true

crates/rpc2/build.rs 🔗

@@ -0,0 +1,8 @@
+fn main() {
+    let mut build = prost_build::Config::new();
+    // build.protoc_arg("--experimental_allow_proto3_optional");
+    build
+        .type_attribute(".", "#[derive(serde::Serialize)]")
+        .compile_protos(&["proto/zed.proto"], &["proto"])
+        .unwrap();
+}

crates/rpc2/proto/zed.proto 🔗

@@ -0,0 +1,1559 @@
+syntax = "proto3";
+package zed.messages;
+
+// Looking for a number? Search "// Current max"
+
+message PeerId {
+    uint32 owner_id = 1;
+    uint32 id = 2;
+}
+
+message Envelope {
+    uint32 id = 1;
+    optional uint32 responding_to = 2;
+    optional PeerId original_sender_id = 3;
+    oneof payload {
+        Hello hello = 4;
+        Ack ack = 5;
+        Error error = 6;
+        Ping ping = 7;
+        Test test = 8;
+
+        CreateRoom create_room = 9;
+        CreateRoomResponse create_room_response = 10;
+        JoinRoom join_room = 11;
+        JoinRoomResponse join_room_response = 12;
+        RejoinRoom rejoin_room = 13;
+        RejoinRoomResponse rejoin_room_response = 14;
+        LeaveRoom leave_room = 15;
+        Call call = 16;
+        IncomingCall incoming_call = 17;
+        CallCanceled call_canceled = 18;
+        CancelCall cancel_call = 19;
+        DeclineCall decline_call = 20;
+        UpdateParticipantLocation update_participant_location = 21;
+        RoomUpdated room_updated = 22;
+
+        ShareProject share_project = 23;
+        ShareProjectResponse share_project_response = 24;
+        UnshareProject unshare_project = 25;
+        JoinProject join_project = 26;
+        JoinProjectResponse join_project_response = 27;
+        LeaveProject leave_project = 28;
+        AddProjectCollaborator add_project_collaborator = 29;
+        UpdateProjectCollaborator update_project_collaborator = 30;
+        RemoveProjectCollaborator remove_project_collaborator = 31;
+
+        GetDefinition get_definition = 32;
+        GetDefinitionResponse get_definition_response = 33;
+        GetTypeDefinition get_type_definition = 34;
+        GetTypeDefinitionResponse get_type_definition_response = 35;
+        GetReferences get_references = 36;
+        GetReferencesResponse get_references_response = 37;
+        GetDocumentHighlights get_document_highlights = 38;
+        GetDocumentHighlightsResponse get_document_highlights_response = 39;
+        GetProjectSymbols get_project_symbols = 40;
+        GetProjectSymbolsResponse get_project_symbols_response = 41;
+        OpenBufferForSymbol open_buffer_for_symbol = 42;
+        OpenBufferForSymbolResponse open_buffer_for_symbol_response = 43;
+
+        UpdateProject update_project = 44;
+        UpdateWorktree update_worktree = 45;
+
+        CreateProjectEntry create_project_entry = 46;
+        RenameProjectEntry rename_project_entry = 47;
+        CopyProjectEntry copy_project_entry = 48;
+        DeleteProjectEntry delete_project_entry = 49;
+        ProjectEntryResponse project_entry_response = 50;
+        ExpandProjectEntry expand_project_entry = 51;
+        ExpandProjectEntryResponse expand_project_entry_response = 52;
+
+        UpdateDiagnosticSummary update_diagnostic_summary = 53;
+        StartLanguageServer start_language_server = 54;
+        UpdateLanguageServer update_language_server = 55;
+
+        OpenBufferById open_buffer_by_id = 56;
+        OpenBufferByPath open_buffer_by_path = 57;
+        OpenBufferResponse open_buffer_response = 58;
+        CreateBufferForPeer create_buffer_for_peer = 59;
+        UpdateBuffer update_buffer = 60;
+        UpdateBufferFile update_buffer_file = 61;
+        SaveBuffer save_buffer = 62;
+        BufferSaved buffer_saved = 63;
+        BufferReloaded buffer_reloaded = 64;
+        ReloadBuffers reload_buffers = 65;
+        ReloadBuffersResponse reload_buffers_response = 66;
+        SynchronizeBuffers synchronize_buffers = 67;
+        SynchronizeBuffersResponse synchronize_buffers_response = 68;
+        FormatBuffers format_buffers = 69;
+        FormatBuffersResponse format_buffers_response = 70;
+        GetCompletions get_completions = 71;
+        GetCompletionsResponse get_completions_response = 72;
+        ApplyCompletionAdditionalEdits apply_completion_additional_edits = 73;
+        ApplyCompletionAdditionalEditsResponse apply_completion_additional_edits_response = 74;
+        GetCodeActions get_code_actions = 75;
+        GetCodeActionsResponse get_code_actions_response = 76;
+        GetHover get_hover = 77;
+        GetHoverResponse get_hover_response = 78;
+        ApplyCodeAction apply_code_action = 79;
+        ApplyCodeActionResponse apply_code_action_response = 80;
+        PrepareRename prepare_rename = 81;
+        PrepareRenameResponse prepare_rename_response = 82;
+        PerformRename perform_rename = 83;
+        PerformRenameResponse perform_rename_response = 84;
+        SearchProject search_project = 85;
+        SearchProjectResponse search_project_response = 86;
+
+        UpdateContacts update_contacts = 87;
+        UpdateInviteInfo update_invite_info = 88;
+        ShowContacts show_contacts = 89;
+
+        GetUsers get_users = 90;
+        FuzzySearchUsers fuzzy_search_users = 91;
+        UsersResponse users_response = 92;
+        RequestContact request_contact = 93;
+        RespondToContactRequest respond_to_contact_request = 94;
+        RemoveContact remove_contact = 95;
+
+        Follow follow = 96;
+        FollowResponse follow_response = 97;
+        UpdateFollowers update_followers = 98;
+        Unfollow unfollow = 99;
+        GetPrivateUserInfo get_private_user_info = 100;
+        GetPrivateUserInfoResponse get_private_user_info_response = 101;
+        UpdateDiffBase update_diff_base = 102;
+
+        OnTypeFormatting on_type_formatting = 103;
+        OnTypeFormattingResponse on_type_formatting_response = 104;
+
+        UpdateWorktreeSettings update_worktree_settings = 105;
+
+        InlayHints inlay_hints = 106;
+        InlayHintsResponse inlay_hints_response = 107;
+        ResolveInlayHint resolve_inlay_hint = 108;
+        ResolveInlayHintResponse resolve_inlay_hint_response = 109;
+        RefreshInlayHints refresh_inlay_hints = 110;
+
+        CreateChannel create_channel = 111;
+        CreateChannelResponse create_channel_response = 112;
+        InviteChannelMember invite_channel_member = 113;
+        RemoveChannelMember remove_channel_member = 114;
+        RespondToChannelInvite respond_to_channel_invite = 115;
+        UpdateChannels update_channels = 116;
+        JoinChannel join_channel = 117;
+        DeleteChannel delete_channel = 118;
+        GetChannelMembers get_channel_members = 119;
+        GetChannelMembersResponse get_channel_members_response = 120;
+        SetChannelMemberAdmin set_channel_member_admin = 121;
+        RenameChannel rename_channel = 122;
+        RenameChannelResponse rename_channel_response = 123;
+
+        JoinChannelBuffer join_channel_buffer = 124;
+        JoinChannelBufferResponse join_channel_buffer_response = 125;
+        UpdateChannelBuffer update_channel_buffer = 126;
+        LeaveChannelBuffer leave_channel_buffer = 127;
+        UpdateChannelBufferCollaborators update_channel_buffer_collaborators = 128;
+        RejoinChannelBuffers rejoin_channel_buffers = 129;
+        RejoinChannelBuffersResponse rejoin_channel_buffers_response = 130;
+        AckBufferOperation ack_buffer_operation = 143;
+
+        JoinChannelChat join_channel_chat = 131;
+        JoinChannelChatResponse join_channel_chat_response = 132;
+        LeaveChannelChat leave_channel_chat = 133;
+        SendChannelMessage send_channel_message = 134;
+        SendChannelMessageResponse send_channel_message_response = 135;
+        ChannelMessageSent channel_message_sent = 136;
+        GetChannelMessages get_channel_messages = 137;
+        GetChannelMessagesResponse get_channel_messages_response = 138;
+        RemoveChannelMessage remove_channel_message = 139;
+        AckChannelMessage ack_channel_message = 144;
+
+        LinkChannel link_channel = 140;
+        UnlinkChannel unlink_channel = 141;
+        MoveChannel move_channel = 142; // current max: 144
+    }
+}
+
+// Messages
+
+message Hello {
+    PeerId peer_id = 1;
+}
+
+message Ping {}
+
+message Ack {}
+
+message Error {
+    string message = 1;
+}
+
+message Test {
+    uint64 id = 1;
+}
+
+message CreateRoom {}
+
+message CreateRoomResponse {
+    Room room = 1;
+    optional LiveKitConnectionInfo live_kit_connection_info = 2;
+}
+
+message JoinRoom {
+    uint64 id = 1;
+}
+
+message JoinRoomResponse {
+    Room room = 1;
+    optional uint64 channel_id = 2;
+    optional LiveKitConnectionInfo live_kit_connection_info = 3;
+}
+
+message RejoinRoom {
+    uint64 id = 1;
+    repeated UpdateProject reshared_projects = 2;
+    repeated RejoinProject rejoined_projects = 3;
+}
+
+message RejoinProject {
+    uint64 id = 1;
+    repeated RejoinWorktree worktrees = 2;
+}
+
+message RejoinWorktree {
+    uint64 id = 1;
+    uint64 scan_id = 2;
+}
+
+message RejoinRoomResponse {
+    Room room = 1;
+    repeated ResharedProject reshared_projects = 2;
+    repeated RejoinedProject rejoined_projects = 3;
+}
+
+message ResharedProject {
+    uint64 id = 1;
+    repeated Collaborator collaborators = 2;
+}
+
+message RejoinedProject {
+    uint64 id = 1;
+    repeated WorktreeMetadata worktrees = 2;
+    repeated Collaborator collaborators = 3;
+    repeated LanguageServer language_servers = 4;
+}
+
+message LeaveRoom {}
+
+message Room {
+    uint64 id = 1;
+    repeated Participant participants = 2;
+    repeated PendingParticipant pending_participants = 3;
+    repeated Follower followers = 4;
+    string live_kit_room = 5;
+}
+
+message Participant {
+    uint64 user_id = 1;
+    PeerId peer_id = 2;
+    repeated ParticipantProject projects = 3;
+    ParticipantLocation location = 4;
+    uint32 participant_index = 5;
+}
+
+message PendingParticipant {
+    uint64 user_id = 1;
+    uint64 calling_user_id = 2;
+    optional uint64 initial_project_id = 3;
+}
+
+message ParticipantProject {
+    uint64 id = 1;
+    repeated string worktree_root_names = 2;
+}
+
+message Follower {
+    PeerId leader_id = 1;
+    PeerId follower_id = 2;
+    uint64 project_id = 3;
+}
+
+message ParticipantLocation {
+    oneof variant {
+        SharedProject shared_project = 1;
+        UnsharedProject unshared_project = 2;
+        External external = 3;
+    }
+
+    message SharedProject {
+        uint64 id = 1;
+    }
+
+    message UnsharedProject {}
+
+    message External {}
+}
+
+message Call {
+    uint64 room_id = 1;
+    uint64 called_user_id = 2;
+    optional uint64 initial_project_id = 3;
+}
+
+message IncomingCall {
+    uint64 room_id = 1;
+    uint64 calling_user_id = 2;
+    repeated uint64 participant_user_ids = 3;
+    optional ParticipantProject initial_project = 4;
+}
+
+message CallCanceled {
+    uint64 room_id = 1;
+}
+
+message CancelCall {
+    uint64 room_id = 1;
+    uint64 called_user_id = 2;
+}
+
+message DeclineCall {
+    uint64 room_id = 1;
+}
+
+message UpdateParticipantLocation {
+    uint64 room_id = 1;
+    ParticipantLocation location = 2;
+}
+
+message RoomUpdated {
+    Room room = 1;
+}
+
+message LiveKitConnectionInfo {
+    string server_url = 1;
+    string token = 2;
+}
+
+message ShareProject {
+    uint64 room_id = 1;
+    repeated WorktreeMetadata worktrees = 2;
+}
+
+message ShareProjectResponse {
+    uint64 project_id = 1;
+}
+
+message UnshareProject {
+    uint64 project_id = 1;
+}
+
+message UpdateProject {
+    uint64 project_id = 1;
+    repeated WorktreeMetadata worktrees = 2;
+}
+
+message JoinProject {
+    uint64 project_id = 1;
+}
+
+message JoinProjectResponse {
+    uint32 replica_id = 1;
+    repeated WorktreeMetadata worktrees = 2;
+    repeated Collaborator collaborators = 3;
+    repeated LanguageServer language_servers = 4;
+}
+
+message LeaveProject {
+    uint64 project_id = 1;
+}
+
+message UpdateWorktree {
+    uint64 project_id = 1;
+    uint64 worktree_id = 2;
+    string root_name = 3;
+    repeated Entry updated_entries = 4;
+    repeated uint64 removed_entries = 5;
+    repeated RepositoryEntry updated_repositories = 6;
+    repeated uint64 removed_repositories = 7;
+    uint64 scan_id = 8;
+    bool is_last_update = 9;
+    string abs_path = 10;
+}
+
+message UpdateWorktreeSettings {
+    uint64 project_id = 1;
+    uint64 worktree_id = 2;
+    string path = 3;
+    optional string content = 4;
+}
+
+message CreateProjectEntry {
+    uint64 project_id = 1;
+    uint64 worktree_id = 2;
+    string path = 3;
+    bool is_directory = 4;
+}
+
+message RenameProjectEntry {
+    uint64 project_id = 1;
+    uint64 entry_id = 2;
+    string new_path = 3;
+}
+
+message CopyProjectEntry {
+    uint64 project_id = 1;
+    uint64 entry_id = 2;
+    string new_path = 3;
+}
+
+message DeleteProjectEntry {
+    uint64 project_id = 1;
+    uint64 entry_id = 2;
+}
+
+message ExpandProjectEntry {
+    uint64 project_id = 1;
+    uint64 entry_id = 2;
+}
+
+message ExpandProjectEntryResponse {
+    uint64 worktree_scan_id = 1;
+}
+
+message ProjectEntryResponse {
+    Entry entry = 1;
+    uint64 worktree_scan_id = 2;
+}
+
+message AddProjectCollaborator {
+    uint64 project_id = 1;
+    Collaborator collaborator = 2;
+}
+
+message UpdateProjectCollaborator {
+    uint64 project_id = 1;
+    PeerId old_peer_id = 2;
+    PeerId new_peer_id = 3;
+}
+
+message RemoveProjectCollaborator {
+    uint64 project_id = 1;
+    PeerId peer_id = 2;
+}
+
+message UpdateChannelBufferCollaborators {
+    uint64 channel_id = 1;
+    repeated Collaborator collaborators = 2;
+}
+
+message GetDefinition {
+     uint64 project_id = 1;
+     uint64 buffer_id = 2;
+     Anchor position = 3;
+     repeated VectorClockEntry version = 4;
+ }
+
+message GetDefinitionResponse {
+    repeated LocationLink links = 1;
+}
+
+message GetTypeDefinition {
+     uint64 project_id = 1;
+     uint64 buffer_id = 2;
+     Anchor position = 3;
+     repeated VectorClockEntry version = 4;
+ }
+
+message GetTypeDefinitionResponse {
+    repeated LocationLink links = 1;
+}
+
+message GetReferences {
+     uint64 project_id = 1;
+     uint64 buffer_id = 2;
+     Anchor position = 3;
+     repeated VectorClockEntry version = 4;
+ }
+
+message GetReferencesResponse {
+    repeated Location locations = 1;
+}
+
+message GetDocumentHighlights {
+     uint64 project_id = 1;
+     uint64 buffer_id = 2;
+     Anchor position = 3;
+     repeated VectorClockEntry version = 4;
+ }
+
+message GetDocumentHighlightsResponse {
+    repeated DocumentHighlight highlights = 1;
+}
+
+message Location {
+    uint64 buffer_id = 1;
+    Anchor start = 2;
+    Anchor end = 3;
+}
+
+message LocationLink {
+    optional Location origin = 1;
+    Location target = 2;
+}
+
+message DocumentHighlight {
+    Kind kind = 1;
+    Anchor start = 2;
+    Anchor end = 3;
+
+    enum Kind {
+        Text = 0;
+        Read = 1;
+        Write = 2;
+    }
+}
+
+message GetProjectSymbols {
+    uint64 project_id = 1;
+    string query = 2;
+}
+
+message GetProjectSymbolsResponse {
+    repeated Symbol symbols = 4;
+}
+
+message Symbol {
+    uint64 source_worktree_id = 1;
+    uint64 worktree_id = 2;
+    string language_server_name = 3;
+    string name = 4;
+    int32 kind = 5;
+    string path = 6;
+    // Cannot use generate anchors for unopened files,
+    // so we are forced to use point coords instead
+    PointUtf16 start = 7;
+    PointUtf16 end = 8;
+    bytes signature = 9;
+}
+
+message OpenBufferForSymbol {
+    uint64 project_id = 1;
+    Symbol symbol = 2;
+}
+
+message OpenBufferForSymbolResponse {
+    uint64 buffer_id = 1;
+}
+
+message OpenBufferByPath {
+    uint64 project_id = 1;
+    uint64 worktree_id = 2;
+    string path = 3;
+}
+
+message OpenBufferById {
+    uint64 project_id = 1;
+    uint64 id = 2;
+}
+
+message OpenBufferResponse {
+    uint64 buffer_id = 1;
+}
+
+message CreateBufferForPeer {
+    uint64 project_id = 1;
+    PeerId peer_id = 2;
+    oneof variant {
+        BufferState state = 3;
+        BufferChunk chunk = 4;
+    }
+}
+
+message UpdateBuffer {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    repeated Operation operations = 3;
+}
+
+message UpdateChannelBuffer {
+    uint64 channel_id = 1;
+    repeated Operation operations = 2;
+}
+
+message UpdateBufferFile {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    File file = 3;
+}
+
+message SaveBuffer {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    repeated VectorClockEntry version = 3;
+}
+
+message BufferSaved {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    repeated VectorClockEntry version = 3;
+    Timestamp mtime = 4;
+    string fingerprint = 5;
+}
+
+message BufferReloaded {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    repeated VectorClockEntry version = 3;
+    Timestamp mtime = 4;
+    string fingerprint = 5;
+    LineEnding line_ending = 6;
+}
+
+message ReloadBuffers {
+    uint64 project_id = 1;
+    repeated uint64 buffer_ids = 2;
+}
+
+message ReloadBuffersResponse {
+    ProjectTransaction transaction = 1;
+}
+
+message SynchronizeBuffers {
+    uint64 project_id = 1;
+    repeated BufferVersion buffers = 2;
+}
+
+message SynchronizeBuffersResponse {
+    repeated BufferVersion buffers = 1;
+}
+
+message BufferVersion {
+    uint64 id = 1;
+    repeated VectorClockEntry version = 2;
+}
+
+message ChannelBufferVersion {
+    uint64 channel_id = 1;
+    repeated VectorClockEntry version = 2;
+    uint64 epoch = 3;
+}
+
+enum FormatTrigger {
+    Save = 0;
+    Manual = 1;
+}
+
+message FormatBuffers {
+    uint64 project_id = 1;
+    FormatTrigger trigger = 2;
+    repeated uint64 buffer_ids = 3;
+}
+
+message FormatBuffersResponse {
+    ProjectTransaction transaction = 1;
+}
+
+message GetCompletions {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Anchor position = 3;
+    repeated VectorClockEntry version = 4;
+}
+
+message GetCompletionsResponse {
+    repeated Completion completions = 1;
+    repeated VectorClockEntry version = 2;
+}
+
+message ApplyCompletionAdditionalEdits {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Completion completion = 3;
+}
+
+message ApplyCompletionAdditionalEditsResponse {
+    Transaction transaction = 1;
+}
+
+message Completion {
+    Anchor old_start = 1;
+    Anchor old_end = 2;
+    string new_text = 3;
+    uint64 server_id = 4;
+    bytes lsp_completion = 5;
+}
+
+message GetCodeActions {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Anchor start = 3;
+    Anchor end = 4;
+    repeated VectorClockEntry version = 5;
+}
+
+message GetCodeActionsResponse {
+    repeated CodeAction actions = 1;
+    repeated VectorClockEntry version = 2;
+}
+
+message GetHover {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Anchor position = 3;
+    repeated VectorClockEntry version = 5;
+}
+
+message GetHoverResponse {
+    optional Anchor start = 1;
+    optional Anchor end = 2;
+    repeated HoverBlock contents = 3;
+}
+
+message HoverBlock {
+    string text = 1;
+    optional string language = 2;
+    bool is_markdown = 3;
+}
+
+message ApplyCodeAction {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    CodeAction action = 3;
+}
+
+message ApplyCodeActionResponse {
+    ProjectTransaction transaction = 1;
+}
+
+message PrepareRename {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Anchor position = 3;
+    repeated VectorClockEntry version = 4;
+}
+
+message PrepareRenameResponse {
+    bool can_rename = 1;
+    Anchor start = 2;
+    Anchor end = 3;
+    repeated VectorClockEntry version = 4;
+}
+
+message PerformRename {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Anchor position = 3;
+    string new_name = 4;
+    repeated VectorClockEntry version = 5;
+}
+
+message OnTypeFormatting {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Anchor position = 3;
+    string trigger = 4;
+    repeated VectorClockEntry version = 5;
+}
+
+message OnTypeFormattingResponse {
+    Transaction transaction = 1;
+}
+
+message InlayHints {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    Anchor start = 3;
+    Anchor end = 4;
+    repeated VectorClockEntry version = 5;
+}
+
+message InlayHintsResponse {
+    repeated InlayHint hints = 1;
+    repeated VectorClockEntry version = 2;
+}
+
+message InlayHint {
+    Anchor position = 1;
+    InlayHintLabel label = 2;
+    optional string kind = 3;
+    bool padding_left = 4;
+    bool padding_right = 5;
+    InlayHintTooltip tooltip = 6;
+    ResolveState resolve_state = 7;
+}
+
+message InlayHintLabel {
+    oneof label {
+        string value = 1;
+        InlayHintLabelParts label_parts = 2;
+    }
+}
+
+message InlayHintLabelParts {
+    repeated InlayHintLabelPart parts = 1;
+}
+
+message InlayHintLabelPart {
+    string value = 1;
+    InlayHintLabelPartTooltip tooltip = 2;
+    optional string location_url = 3;
+    PointUtf16 location_range_start = 4;
+    PointUtf16 location_range_end = 5;
+    optional uint64 language_server_id = 6;
+}
+
+message InlayHintTooltip {
+    oneof content {
+        string value = 1;
+        MarkupContent markup_content = 2;
+    }
+}
+
+message InlayHintLabelPartTooltip {
+    oneof content {
+        string value = 1;
+        MarkupContent markup_content = 2;
+    }
+}
+
+message ResolveState {
+    State state = 1;
+    LspResolveState lsp_resolve_state = 2;
+
+    enum State {
+        Resolved = 0;
+        CanResolve = 1;
+        Resolving = 2;
+    }
+
+    message LspResolveState {
+        string value = 1;
+        uint64 server_id = 2;
+    }
+}
+
+message ResolveInlayHint {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    uint64 language_server_id = 3;
+    InlayHint hint = 4;
+}
+
+message ResolveInlayHintResponse {
+    InlayHint hint = 1;
+}
+
+message RefreshInlayHints {
+    uint64 project_id = 1;
+}
+
+message MarkupContent {
+    bool is_markdown = 1;
+    string value = 2;
+}
+
+message PerformRenameResponse {
+    ProjectTransaction transaction = 2;
+}
+
+message SearchProject {
+    uint64 project_id = 1;
+    string query = 2;
+    bool regex = 3;
+    bool whole_word = 4;
+    bool case_sensitive = 5;
+    string files_to_include = 6;
+    string files_to_exclude = 7;
+}
+
+message SearchProjectResponse {
+    repeated Location locations = 1;
+}
+
+message CodeAction {
+    uint64 server_id = 1;
+    Anchor start = 2;
+    Anchor end = 3;
+    bytes lsp_action = 4;
+}
+
+message ProjectTransaction {
+    repeated uint64 buffer_ids = 1;
+    repeated Transaction transactions = 2;
+}
+
+message Transaction {
+    LamportTimestamp id = 1;
+    repeated LamportTimestamp edit_ids = 2;
+    repeated VectorClockEntry start = 3;
+}
+
+message LamportTimestamp {
+    uint32 replica_id = 1;
+    uint32 value = 2;
+}
+
+message LanguageServer {
+    uint64 id = 1;
+    string name = 2;
+}
+
+message StartLanguageServer {
+    uint64 project_id = 1;
+    LanguageServer server = 2;
+}
+
+message UpdateDiagnosticSummary {
+    uint64 project_id = 1;
+    uint64 worktree_id = 2;
+    DiagnosticSummary summary = 3;
+}
+
+message DiagnosticSummary {
+    string path = 1;
+    uint64 language_server_id = 2;
+    uint32 error_count = 3;
+    uint32 warning_count = 4;
+}
+
+message UpdateLanguageServer {
+    uint64 project_id = 1;
+    uint64 language_server_id = 2;
+    oneof variant {
+        LspWorkStart work_start = 3;
+        LspWorkProgress work_progress = 4;
+        LspWorkEnd work_end = 5;
+        LspDiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 6;
+        LspDiskBasedDiagnosticsUpdated disk_based_diagnostics_updated = 7;
+    }
+}
+
+message LspWorkStart {
+    string token = 1;
+    optional string message = 2;
+    optional uint32 percentage = 3;
+}
+
+message LspWorkProgress {
+    string token = 1;
+    optional string message = 2;
+    optional uint32 percentage = 3;
+}
+
+message LspWorkEnd {
+    string token = 1;
+}
+
+message LspDiskBasedDiagnosticsUpdating {}
+
+message LspDiskBasedDiagnosticsUpdated {}
+
+message UpdateChannels {
+    repeated Channel channels = 1;
+    repeated ChannelEdge insert_edge = 2;
+    repeated ChannelEdge delete_edge = 3;
+    repeated uint64 delete_channels = 4;
+    repeated Channel channel_invitations = 5;
+    repeated uint64 remove_channel_invitations = 6;
+    repeated ChannelParticipants channel_participants = 7;
+    repeated ChannelPermission channel_permissions = 8;
+    repeated UnseenChannelMessage unseen_channel_messages = 9;
+    repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10;
+}
+
+message UnseenChannelMessage {
+    uint64 channel_id = 1;
+    uint64 message_id = 2;
+}
+
+message UnseenChannelBufferChange {
+    uint64 channel_id = 1;
+    uint64 epoch = 2;
+    repeated VectorClockEntry version = 3;
+}
+
+message ChannelEdge {
+    uint64 channel_id = 1;
+    uint64 parent_id = 2;
+}
+
+message ChannelPermission {
+    uint64 channel_id = 1;
+    bool is_admin = 2;
+}
+
+message ChannelParticipants {
+    uint64 channel_id = 1;
+    repeated uint64 participant_user_ids = 2;
+}
+
+message JoinChannel {
+    uint64 channel_id = 1;
+}
+
+message DeleteChannel {
+    uint64 channel_id = 1;
+}
+
+message GetChannelMembers {
+    uint64 channel_id = 1;
+}
+
+message GetChannelMembersResponse {
+    repeated ChannelMember members = 1;
+}
+
+message ChannelMember {
+    uint64 user_id = 1;
+    bool admin = 2;
+    Kind kind = 3;
+
+    enum Kind {
+        Member = 0;
+        Invitee = 1;
+        AncestorMember = 2;
+    }
+}
+
+message CreateChannel {
+    string name = 1;
+    optional uint64 parent_id = 2;
+}
+
+message CreateChannelResponse {
+    Channel channel = 1;
+    optional uint64 parent_id = 2;
+}
+
+message InviteChannelMember {
+    uint64 channel_id = 1;
+    uint64 user_id = 2;
+    bool admin = 3;
+}
+
+message RemoveChannelMember {
+    uint64 channel_id = 1;
+    uint64 user_id = 2;
+}
+
+message SetChannelMemberAdmin {
+    uint64 channel_id = 1;
+    uint64 user_id = 2;
+    bool admin = 3;
+}
+
+message RenameChannel {
+    uint64 channel_id = 1;
+    string name = 2;
+}
+
+message RenameChannelResponse {
+    Channel channel = 1;
+}
+
+message JoinChannelChat {
+    uint64 channel_id = 1;
+}
+
+message JoinChannelChatResponse {
+    repeated ChannelMessage messages = 1;
+    bool done = 2;
+}
+
+message LeaveChannelChat {
+    uint64 channel_id = 1;
+}
+
+message SendChannelMessage {
+    uint64 channel_id = 1;
+    string body = 2;
+    Nonce nonce = 3;
+}
+
+message RemoveChannelMessage {
+    uint64 channel_id = 1;
+    uint64 message_id = 2;
+}
+
+message AckChannelMessage {
+    uint64 channel_id = 1;
+    uint64 message_id = 2;
+}
+
+message SendChannelMessageResponse {
+    ChannelMessage message = 1;
+}
+
+message ChannelMessageSent {
+    uint64 channel_id = 1;
+    ChannelMessage message = 2;
+}
+
+message GetChannelMessages {
+    uint64 channel_id = 1;
+    uint64 before_message_id = 2;
+}
+
+message GetChannelMessagesResponse {
+    repeated ChannelMessage messages = 1;
+    bool done = 2;
+}
+
+message LinkChannel {
+    uint64 channel_id = 1;
+    uint64 to = 2;
+}
+
+message UnlinkChannel {
+    uint64 channel_id = 1;
+    uint64 from = 2;
+}
+
+message MoveChannel {
+    uint64 channel_id = 1;
+    uint64 from = 2;
+    uint64 to = 3;
+}
+
+message JoinChannelBuffer {
+    uint64 channel_id = 1;
+}
+
+message ChannelMessage {
+    uint64 id = 1;
+    string body = 2;
+    uint64 timestamp = 3;
+    uint64 sender_id = 4;
+    Nonce nonce = 5;
+}
+
+message RejoinChannelBuffers {
+    repeated ChannelBufferVersion buffers = 1;
+}
+
+message RejoinChannelBuffersResponse {
+    repeated RejoinedChannelBuffer buffers = 1;
+}
+
+message AckBufferOperation {
+    uint64 buffer_id = 1;
+    uint64 epoch = 2;
+    repeated VectorClockEntry version = 3;
+}
+
+message JoinChannelBufferResponse {
+    uint64 buffer_id = 1;
+    uint32 replica_id = 2;
+    string base_text = 3;
+    repeated Operation operations = 4;
+    repeated Collaborator collaborators = 5;
+    uint64 epoch = 6;
+}
+
+message RejoinedChannelBuffer {
+    uint64 channel_id = 1;
+    repeated VectorClockEntry version = 2;
+    repeated Operation operations = 3;
+    repeated Collaborator collaborators = 4;
+}
+
+message LeaveChannelBuffer {
+    uint64 channel_id = 1;
+}
+
+message RespondToChannelInvite {
+    uint64 channel_id = 1;
+    bool accept = 2;
+}
+
+message GetUsers {
+    repeated uint64 user_ids = 1;
+}
+
+message FuzzySearchUsers {
+    string query = 1;
+}
+
+message UsersResponse {
+    repeated User users = 1;
+}
+
+message RequestContact {
+    uint64 responder_id = 1;
+}
+
+message RemoveContact {
+    uint64 user_id = 1;
+}
+
+message RespondToContactRequest {
+    uint64 requester_id = 1;
+    ContactRequestResponse response = 2;
+}
+
+enum ContactRequestResponse {
+    Accept = 0;
+    Decline = 1;
+    Block = 2;
+    Dismiss = 3;
+}
+
+message UpdateContacts {
+    repeated Contact contacts = 1;
+    repeated uint64 remove_contacts = 2;
+    repeated IncomingContactRequest incoming_requests = 3;
+    repeated uint64 remove_incoming_requests = 4;
+    repeated uint64 outgoing_requests = 5;
+    repeated uint64 remove_outgoing_requests = 6;
+}
+
+message UpdateInviteInfo {
+    string url = 1;
+    uint32 count = 2;
+}
+
+message ShowContacts {}
+
+message IncomingContactRequest {
+    uint64 requester_id = 1;
+    bool should_notify = 2;
+}
+
+message UpdateDiagnostics {
+    uint32 replica_id = 1;
+    uint32 lamport_timestamp = 2;
+    uint64 server_id = 3;
+    repeated Diagnostic diagnostics = 4;
+}
+
+message Follow {
+    uint64 room_id = 1;
+    optional uint64 project_id = 2;
+    PeerId leader_id = 3;
+}
+
+message FollowResponse {
+    optional ViewId active_view_id = 1;
+    repeated View views = 2;
+}
+
+message UpdateFollowers {
+    uint64 room_id = 1;
+    optional uint64 project_id = 2;
+    repeated PeerId follower_ids = 3;
+    oneof variant {
+        UpdateActiveView update_active_view = 4;
+        View create_view = 5;
+        UpdateView update_view = 6;
+    }
+}
+
+message Unfollow {
+    uint64 room_id = 1;
+    optional uint64 project_id = 2;
+    PeerId leader_id = 3;
+}
+
+message GetPrivateUserInfo {}
+
+message GetPrivateUserInfoResponse {
+    string metrics_id = 1;
+    bool staff = 2;
+    repeated string flags = 3;
+}
+
+// Entities
+
+message ViewId {
+    PeerId creator = 1;
+    uint64 id = 2;
+}
+
+message UpdateActiveView {
+    optional ViewId id = 1;
+    optional PeerId leader_id = 2;
+}
+
+message UpdateView {
+    ViewId id = 1;
+    optional PeerId leader_id = 2;
+
+    oneof variant {
+        Editor editor = 3;
+    }
+
+    message Editor {
+        repeated ExcerptInsertion inserted_excerpts = 1;
+        repeated uint64 deleted_excerpts = 2;
+        repeated Selection selections = 3;
+        optional Selection pending_selection = 4;
+        EditorAnchor scroll_top_anchor = 5;
+        float scroll_x = 6;
+        float scroll_y = 7;
+    }
+}
+
+message View {
+    ViewId id = 1;
+    optional PeerId leader_id = 2;
+
+    oneof variant {
+        Editor editor = 3;
+        ChannelView channel_view = 4;
+    }
+
+    message Editor {
+        bool singleton = 1;
+        optional string title = 2;
+        repeated Excerpt excerpts = 3;
+        repeated Selection selections = 4;
+        optional Selection pending_selection = 5;
+        EditorAnchor scroll_top_anchor = 6;
+        float scroll_x = 7;
+        float scroll_y = 8;
+    }
+
+    message ChannelView {
+        uint64 channel_id = 1;
+        Editor editor = 2;
+    }
+}
+
+message Collaborator {
+    PeerId peer_id = 1;
+    uint32 replica_id = 2;
+    uint64 user_id = 3;
+}
+
+message User {
+    uint64 id = 1;
+    string github_login = 2;
+    string avatar_url = 3;
+}
+
+message File {
+    uint64 worktree_id = 1;
+    uint64 entry_id = 2;
+    string path = 3;
+    Timestamp mtime = 4;
+    bool is_deleted = 5;
+}
+
+message Entry {
+    uint64 id = 1;
+    bool is_dir = 2;
+    string path = 3;
+    uint64 inode = 4;
+    Timestamp mtime = 5;
+    bool is_symlink = 6;
+    bool is_ignored = 7;
+    bool is_external = 8;
+    optional GitStatus git_status = 9;
+}
+
+message RepositoryEntry {
+    uint64 work_directory_id = 1;
+    optional string branch = 2;
+}
+
+message StatusEntry {
+    string repo_path = 1;
+    GitStatus status = 2;
+}
+
+enum GitStatus {
+    Added = 0;
+    Modified = 1;
+    Conflict = 2;
+}
+
+message BufferState {
+    uint64 id = 1;
+    optional File file = 2;
+    string base_text = 3;
+    optional string diff_base = 4;
+    LineEnding line_ending = 5;
+    repeated VectorClockEntry saved_version = 6;
+    string saved_version_fingerprint = 7;
+    Timestamp saved_mtime = 8;
+}
+
+message BufferChunk {
+    uint64 buffer_id = 1;
+    repeated Operation operations = 2;
+    bool is_last = 3;
+}
+
+enum LineEnding {
+    Unix = 0;
+    Windows = 1;
+}
+
+message Selection {
+    uint64 id = 1;
+    EditorAnchor start = 2;
+    EditorAnchor end = 3;
+    bool reversed = 4;
+}
+
+message EditorAnchor {
+    uint64 excerpt_id = 1;
+    Anchor anchor = 2;
+}
+
+enum CursorShape {
+    CursorBar = 0;
+    CursorBlock = 1;
+    CursorUnderscore = 2;
+    CursorHollow = 3;
+}
+
+message ExcerptInsertion {
+    Excerpt excerpt = 1;
+    optional uint64 previous_excerpt_id = 2;
+}
+
+message Excerpt {
+    uint64 id = 1;
+    uint64 buffer_id = 2;
+    Anchor context_start = 3;
+    Anchor context_end = 4;
+    Anchor primary_start = 5;
+    Anchor primary_end = 6;
+}
+
+message Anchor {
+    uint32 replica_id = 1;
+    uint32 timestamp = 2;
+    uint64 offset = 3;
+    Bias bias = 4;
+    optional uint64 buffer_id = 5;
+}
+
+enum Bias {
+    Left = 0;
+    Right = 1;
+}
+
+message Diagnostic {
+    Anchor start = 1;
+    Anchor end = 2;
+    optional string source = 3;
+    Severity severity = 4;
+    string message = 5;
+    optional string code = 6;
+    uint64 group_id = 7;
+    bool is_primary = 8;
+    bool is_valid = 9;
+    bool is_disk_based = 10;
+    bool is_unnecessary = 11;
+
+    enum Severity {
+        None = 0;
+        Error = 1;
+        Warning = 2;
+        Information = 3;
+        Hint = 4;
+    }
+}
+
+message Operation {
+    oneof variant {
+        Edit edit = 1;
+        Undo undo = 2;
+        UpdateSelections update_selections = 3;
+        UpdateDiagnostics update_diagnostics = 4;
+        UpdateCompletionTriggers update_completion_triggers = 5;
+    }
+
+    message Edit {
+        uint32 replica_id = 1;
+        uint32 lamport_timestamp = 2;
+        repeated VectorClockEntry version = 3;
+        repeated Range ranges = 4;
+        repeated string new_text = 5;
+    }
+
+    message Undo {
+        uint32 replica_id = 1;
+        uint32 lamport_timestamp = 2;
+        repeated VectorClockEntry version = 3;
+        repeated UndoCount counts = 4;
+    }
+
+    message UpdateSelections {
+        uint32 replica_id = 1;
+        uint32 lamport_timestamp = 2;
+        repeated Selection selections = 3;
+        bool line_mode = 4;
+        CursorShape cursor_shape = 5;
+    }
+
+    message UpdateCompletionTriggers {
+        uint32 replica_id = 1;
+        uint32 lamport_timestamp = 2;
+        repeated string triggers = 3;
+    }
+}
+
+message UndoMapEntry {
+    uint32 replica_id = 1;
+    uint32 local_timestamp = 2;
+    repeated UndoCount counts = 3;
+}
+
+message UndoCount {
+    uint32 replica_id = 1;
+    uint32 lamport_timestamp = 2;
+    uint32 count = 3;
+}
+
+message VectorClockEntry {
+    uint32 replica_id = 1;
+    uint32 timestamp = 2;
+}
+
+message Timestamp {
+    uint64 seconds = 1;
+    uint32 nanos = 2;
+}
+
+message Range {
+    uint64 start = 1;
+    uint64 end = 2;
+}
+
+message PointUtf16 {
+    uint32 row = 1;
+    uint32 column = 2;
+}
+
+message Nonce {
+    uint64 upper_half = 1;
+    uint64 lower_half = 2;
+}
+
+message Channel {
+    uint64 id = 1;
+    string name = 2;
+}
+
+message Contact {
+    uint64 user_id = 1;
+    bool online = 2;
+    bool busy = 3;
+    bool should_notify = 4;
+}
+
+message WorktreeMetadata {
+    uint64 id = 1;
+    string root_name = 2;
+    bool visible = 3;
+    string abs_path = 4;
+}
+
+message UpdateDiffBase {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+    optional string diff_base = 3;
+}

crates/rpc2/src/auth.rs 🔗

@@ -0,0 +1,136 @@
+use anyhow::{Context, Result};
+use rand::{thread_rng, Rng as _};
+use rsa::{PublicKey as _, PublicKeyEncoding, RSAPrivateKey, RSAPublicKey};
+use std::convert::TryFrom;
+
+pub struct PublicKey(RSAPublicKey);
+
+pub struct PrivateKey(RSAPrivateKey);
+
+/// Generate a public and private key for asymmetric encryption.
+pub fn keypair() -> Result<(PublicKey, PrivateKey)> {
+    let mut rng = thread_rng();
+    let bits = 1024;
+    let private_key = RSAPrivateKey::new(&mut rng, bits)?;
+    let public_key = RSAPublicKey::from(&private_key);
+    Ok((PublicKey(public_key), PrivateKey(private_key)))
+}
+
+/// Generate a random 64-character base64 string.
+pub fn random_token() -> String {
+    let mut rng = thread_rng();
+    let mut token_bytes = [0; 48];
+    for byte in token_bytes.iter_mut() {
+        *byte = rng.gen();
+    }
+    base64::encode_config(token_bytes, base64::URL_SAFE)
+}
+
+impl PublicKey {
+    /// Convert a string to a base64-encoded string that can only be decoded with the corresponding
+    /// private key.
+    pub fn encrypt_string(&self, string: &str) -> Result<String> {
+        let mut rng = thread_rng();
+        let bytes = string.as_bytes();
+        let encrypted_bytes = self
+            .0
+            .encrypt(&mut rng, PADDING_SCHEME, bytes)
+            .context("failed to encrypt string with public key")?;
+        let encrypted_string = base64::encode_config(&encrypted_bytes, base64::URL_SAFE);
+        Ok(encrypted_string)
+    }
+}
+
+impl PrivateKey {
+    /// Decrypt a base64-encoded string that was encrypted by the corresponding public key.
+    pub fn decrypt_string(&self, encrypted_string: &str) -> Result<String> {
+        let encrypted_bytes = base64::decode_config(encrypted_string, base64::URL_SAFE)
+            .context("failed to base64-decode encrypted string")?;
+        let bytes = self
+            .0
+            .decrypt(PADDING_SCHEME, &encrypted_bytes)
+            .context("failed to decrypt string with private key")?;
+        let string = String::from_utf8(bytes).context("decrypted content was not valid utf8")?;
+        Ok(string)
+    }
+}
+
+impl TryFrom<PublicKey> for String {
+    type Error = anyhow::Error;
+    fn try_from(key: PublicKey) -> Result<Self> {
+        let bytes = key.0.to_pkcs1().context("failed to serialize public key")?;
+        let string = base64::encode_config(&bytes, base64::URL_SAFE);
+        Ok(string)
+    }
+}
+
+impl TryFrom<String> for PublicKey {
+    type Error = anyhow::Error;
+    fn try_from(value: String) -> Result<Self> {
+        let bytes = base64::decode_config(&value, base64::URL_SAFE)
+            .context("failed to base64-decode public key string")?;
+        let key = Self(RSAPublicKey::from_pkcs1(&bytes).context("failed to parse public key")?);
+        Ok(key)
+    }
+}
+
+const PADDING_SCHEME: rsa::PaddingScheme = rsa::PaddingScheme::PKCS1v15Encrypt;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_generate_encrypt_and_decrypt_token() {
+        // CLIENT:
+        // * generate a keypair for asymmetric encryption
+        // * serialize the public key to send it to the server.
+        let (public, private) = keypair().unwrap();
+        let public_string = String::try_from(public).unwrap();
+        assert_printable(&public_string);
+
+        // SERVER:
+        // * parse the public key
+        // * generate a random token.
+        // * encrypt the token using the public key.
+        let public = PublicKey::try_from(public_string).unwrap();
+        let token = random_token();
+        let encrypted_token = public.encrypt_string(&token).unwrap();
+        assert_eq!(token.len(), 64);
+        assert_ne!(encrypted_token, token);
+        assert_printable(&token);
+        assert_printable(&encrypted_token);
+
+        // CLIENT:
+        // * decrypt the token using the private key.
+        let decrypted_token = private.decrypt_string(&encrypted_token).unwrap();
+        assert_eq!(decrypted_token, token);
+    }
+
+    #[test]
+    fn test_tokens_are_always_url_safe() {
+        for _ in 0..5 {
+            let token = random_token();
+            let (public_key, _) = keypair().unwrap();
+            let encrypted_token = public_key.encrypt_string(&token).unwrap();
+            let public_key_str = String::try_from(public_key).unwrap();
+
+            assert_printable(&token);
+            assert_printable(&public_key_str);
+            assert_printable(&encrypted_token);
+        }
+    }
+
+    fn assert_printable(token: &str) {
+        for c in token.chars() {
+            assert!(
+                c.is_ascii_graphic(),
+                "token {:?} has non-printable char {}",
+                token,
+                c
+            );
+            assert_ne!(c, '/', "token {:?} is not URL-safe", token);
+            assert_ne!(c, '&', "token {:?} is not URL-safe", token);
+        }
+    }
+}

crates/rpc2/src/conn.rs 🔗

@@ -0,0 +1,108 @@
+use async_tungstenite::tungstenite::Message as WebSocketMessage;
+use futures::{SinkExt as _, StreamExt as _};
+
+pub struct Connection {
+    pub(crate) tx:
+        Box<dyn 'static + Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
+    pub(crate) rx: Box<
+        dyn 'static
+            + Send
+            + Unpin
+            + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>,
+    >,
+}
+
+impl Connection {
+    pub fn new<S>(stream: S) -> Self
+    where
+        S: 'static
+            + Send
+            + Unpin
+            + futures::Sink<WebSocketMessage, Error = anyhow::Error>
+            + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>,
+    {
+        let (tx, rx) = stream.split();
+        Self {
+            tx: Box::new(tx),
+            rx: Box::new(rx),
+        }
+    }
+
+    pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), anyhow::Error> {
+        self.tx.send(message).await
+    }
+
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn in_memory(
+        executor: gpui2::Executor,
+    ) -> (Self, Self, std::sync::Arc<std::sync::atomic::AtomicBool>) {
+        use std::sync::{
+            atomic::{AtomicBool, Ordering::SeqCst},
+            Arc,
+        };
+
+        let killed = Arc::new(AtomicBool::new(false));
+        let (a_tx, a_rx) = channel(killed.clone(), executor.clone());
+        let (b_tx, b_rx) = channel(killed.clone(), executor);
+        return (
+            Self { tx: a_tx, rx: b_rx },
+            Self { tx: b_tx, rx: a_rx },
+            killed,
+        );
+
+        #[allow(clippy::type_complexity)]
+        fn channel(
+            killed: Arc<AtomicBool>,
+            executor: gpui2::Executor,
+        ) -> (
+            Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
+            Box<dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>>,
+        ) {
+            use anyhow::anyhow;
+            use futures::channel::mpsc;
+            use std::io::{Error, ErrorKind};
+
+            let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
+
+            let tx = tx.sink_map_err(|error| anyhow!(error)).with({
+                let killed = killed.clone();
+                let executor = executor.clone();
+                move |msg| {
+                    let killed = killed.clone();
+                    let executor = executor.clone();
+                    Box::pin(async move {
+                        executor.simulate_random_delay().await;
+
+                        // Writes to a half-open TCP connection will error.
+                        if killed.load(SeqCst) {
+                            std::io::Result::Err(Error::new(ErrorKind::Other, "connection lost"))?;
+                        }
+
+                        Ok(msg)
+                    })
+                }
+            });
+
+            let rx = rx.then({
+                let killed = killed;
+                let executor = executor.clone();
+                move |msg| {
+                    let killed = killed.clone();
+                    let executor = executor.clone();
+                    Box::pin(async move {
+                        executor.simulate_random_delay().await;
+
+                        // Reads from a half-open TCP connection will hang.
+                        if killed.load(SeqCst) {
+                            futures::future::pending::<()>().await;
+                        }
+
+                        Ok(msg)
+                    })
+                }
+            });
+
+            (Box::new(tx), Box::new(rx))
+        }
+    }
+}

crates/rpc2/src/macros.rs 🔗

@@ -0,0 +1,70 @@
+#[macro_export]
+macro_rules! messages {
+    ($(($name:ident, $priority:ident)),* $(,)?) => {
+        pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
+            match envelope.payload {
+                $(Some(envelope::Payload::$name(payload)) => {
+                    Some(Box::new(TypedEnvelope {
+                        sender_id,
+                        original_sender_id: envelope.original_sender_id.map(|original_sender| PeerId {
+                            owner_id: original_sender.owner_id,
+                            id: original_sender.id
+                        }),
+                        message_id: envelope.id,
+                        payload,
+                    }))
+                }, )*
+                _ => None
+            }
+        }
+
+        $(
+            impl EnvelopedMessage for $name {
+                const NAME: &'static str = std::stringify!($name);
+                const PRIORITY: MessagePriority = MessagePriority::$priority;
+
+                fn into_envelope(
+                    self,
+                    id: u32,
+                    responding_to: Option<u32>,
+                    original_sender_id: Option<PeerId>,
+                ) -> Envelope {
+                    Envelope {
+                        id,
+                        responding_to,
+                        original_sender_id,
+                        payload: Some(envelope::Payload::$name(self)),
+                    }
+                }
+
+                fn from_envelope(envelope: Envelope) -> Option<Self> {
+                    if let Some(envelope::Payload::$name(msg)) = envelope.payload {
+                        Some(msg)
+                    } else {
+                        None
+                    }
+                }
+            }
+        )*
+    };
+}
+
+#[macro_export]
+macro_rules! request_messages {
+    ($(($request_name:ident, $response_name:ident)),* $(,)?) => {
+        $(impl RequestMessage for $request_name {
+            type Response = $response_name;
+        })*
+    };
+}
+
+#[macro_export]
+macro_rules! entity_messages {
+    ($id_field:ident, $($name:ident),* $(,)?) => {
+        $(impl EntityMessage for $name {
+            fn remote_entity_id(&self) -> u64 {
+                self.$id_field
+            }
+        })*
+    };
+}

crates/rpc2/src/peer.rs 🔗

@@ -0,0 +1,933 @@
+use super::{
+    proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage},
+    Connection,
+};
+use anyhow::{anyhow, Context, Result};
+use collections::HashMap;
+use futures::{
+    channel::{mpsc, oneshot},
+    stream::BoxStream,
+    FutureExt, SinkExt, StreamExt, TryFutureExt,
+};
+use parking_lot::{Mutex, RwLock};
+use serde::{ser::SerializeStruct, Serialize};
+use std::{fmt, sync::atomic::Ordering::SeqCst};
+use std::{
+    future::Future,
+    marker::PhantomData,
+    sync::{
+        atomic::{self, AtomicU32},
+        Arc,
+    },
+    time::Duration,
+};
+use tracing::instrument;
+
+#[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize)]
+pub struct ConnectionId {
+    pub owner_id: u32,
+    pub id: u32,
+}
+
+impl Into<PeerId> for ConnectionId {
+    fn into(self) -> PeerId {
+        PeerId {
+            owner_id: self.owner_id,
+            id: self.id,
+        }
+    }
+}
+
+impl From<PeerId> for ConnectionId {
+    fn from(peer_id: PeerId) -> Self {
+        Self {
+            owner_id: peer_id.owner_id,
+            id: peer_id.id,
+        }
+    }
+}
+
+impl fmt::Display for ConnectionId {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{}/{}", self.owner_id, self.id)
+    }
+}
+
+pub struct Receipt<T> {
+    pub sender_id: ConnectionId,
+    pub message_id: u32,
+    payload_type: PhantomData<T>,
+}
+
+impl<T> Clone for Receipt<T> {
+    fn clone(&self) -> Self {
+        Self {
+            sender_id: self.sender_id,
+            message_id: self.message_id,
+            payload_type: PhantomData,
+        }
+    }
+}
+
+impl<T> Copy for Receipt<T> {}
+
+#[derive(Clone, Debug)]
+pub struct TypedEnvelope<T> {
+    pub sender_id: ConnectionId,
+    pub original_sender_id: Option<PeerId>,
+    pub message_id: u32,
+    pub payload: T,
+}
+
+impl<T> TypedEnvelope<T> {
+    pub fn original_sender_id(&self) -> Result<PeerId> {
+        self.original_sender_id
+            .ok_or_else(|| anyhow!("missing original_sender_id"))
+    }
+}
+
+impl<T: RequestMessage> TypedEnvelope<T> {
+    pub fn receipt(&self) -> Receipt<T> {
+        Receipt {
+            sender_id: self.sender_id,
+            message_id: self.message_id,
+            payload_type: PhantomData,
+        }
+    }
+}
+
+pub struct Peer {
+    epoch: AtomicU32,
+    pub connections: RwLock<HashMap<ConnectionId, ConnectionState>>,
+    next_connection_id: AtomicU32,
+}
+
+#[derive(Clone, Serialize)]
+pub struct ConnectionState {
+    #[serde(skip)]
+    outgoing_tx: mpsc::UnboundedSender<proto::Message>,
+    next_message_id: Arc<AtomicU32>,
+    #[allow(clippy::type_complexity)]
+    #[serde(skip)]
+    response_channels:
+        Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, oneshot::Sender<()>)>>>>>,
+}
+
+const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
+const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
+pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(10);
+
+impl Peer {
+    pub fn new(epoch: u32) -> Arc<Self> {
+        Arc::new(Self {
+            epoch: AtomicU32::new(epoch),
+            connections: Default::default(),
+            next_connection_id: Default::default(),
+        })
+    }
+
+    pub fn epoch(&self) -> u32 {
+        self.epoch.load(SeqCst)
+    }
+
+    #[instrument(skip_all)]
+    pub fn add_connection<F, Fut, Out>(
+        self: &Arc<Self>,
+        connection: Connection,
+        create_timer: F,
+    ) -> (
+        ConnectionId,
+        impl Future<Output = anyhow::Result<()>> + Send,
+        BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
+    )
+    where
+        F: Send + Fn(Duration) -> Fut,
+        Fut: Send + Future<Output = Out>,
+        Out: Send,
+    {
+        // For outgoing messages, use an unbounded channel so that application code
+        // can always send messages without yielding. For incoming messages, use a
+        // bounded channel so that other peers will receive backpressure if they send
+        // messages faster than this peer can process them.
+        #[cfg(any(test, feature = "test-support"))]
+        const INCOMING_BUFFER_SIZE: usize = 1;
+        #[cfg(not(any(test, feature = "test-support")))]
+        const INCOMING_BUFFER_SIZE: usize = 64;
+        let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE);
+        let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded();
+
+        let connection_id = ConnectionId {
+            owner_id: self.epoch.load(SeqCst),
+            id: self.next_connection_id.fetch_add(1, SeqCst),
+        };
+        let connection_state = ConnectionState {
+            outgoing_tx,
+            next_message_id: Default::default(),
+            response_channels: Arc::new(Mutex::new(Some(Default::default()))),
+        };
+        let mut writer = MessageStream::new(connection.tx);
+        let mut reader = MessageStream::new(connection.rx);
+
+        let this = self.clone();
+        let response_channels = connection_state.response_channels.clone();
+        let handle_io = async move {
+            tracing::trace!(%connection_id, "handle io future: start");
+
+            let _end_connection = util::defer(|| {
+                response_channels.lock().take();
+                this.connections.write().remove(&connection_id);
+                tracing::trace!(%connection_id, "handle io future: end");
+            });
+
+            // Send messages on this frequency so the connection isn't closed.
+            let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse();
+            futures::pin_mut!(keepalive_timer);
+
+            // Disconnect if we don't receive messages at least this frequently.
+            let receive_timeout = create_timer(RECEIVE_TIMEOUT).fuse();
+            futures::pin_mut!(receive_timeout);
+
+            loop {
+                tracing::trace!(%connection_id, "outer loop iteration start");
+                let read_message = reader.read().fuse();
+                futures::pin_mut!(read_message);
+
+                loop {
+                    tracing::trace!(%connection_id, "inner loop iteration start");
+                    futures::select_biased! {
+                        outgoing = outgoing_rx.next().fuse() => match outgoing {
+                            Some(outgoing) => {
+                                tracing::trace!(%connection_id, "outgoing rpc message: writing");
+                                futures::select_biased! {
+                                    result = writer.write(outgoing).fuse() => {
+                                        tracing::trace!(%connection_id, "outgoing rpc message: done writing");
+                                        result.context("failed to write RPC message")?;
+                                        tracing::trace!(%connection_id, "keepalive interval: resetting after sending message");
+                                        keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
+                                    }
+                                    _ = create_timer(WRITE_TIMEOUT).fuse() => {
+                                        tracing::trace!(%connection_id, "outgoing rpc message: writing timed out");
+                                        Err(anyhow!("timed out writing message"))?;
+                                    }
+                                }
+                            }
+                            None => {
+                                tracing::trace!(%connection_id, "outgoing rpc message: channel closed");
+                                return Ok(())
+                            },
+                        },
+                        _ = keepalive_timer => {
+                            tracing::trace!(%connection_id, "keepalive interval: pinging");
+                            futures::select_biased! {
+                                result = writer.write(proto::Message::Ping).fuse() => {
+                                    tracing::trace!(%connection_id, "keepalive interval: done pinging");
+                                    result.context("failed to send keepalive")?;
+                                    tracing::trace!(%connection_id, "keepalive interval: resetting after pinging");
+                                    keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
+                                }
+                                _ = create_timer(WRITE_TIMEOUT).fuse() => {
+                                    tracing::trace!(%connection_id, "keepalive interval: pinging timed out");
+                                    Err(anyhow!("timed out sending keepalive"))?;
+                                }
+                            }
+                        }
+                        incoming = read_message => {
+                            let incoming = incoming.context("error reading rpc message from socket")?;
+                            tracing::trace!(%connection_id, "incoming rpc message: received");
+                            tracing::trace!(%connection_id, "receive timeout: resetting");
+                            receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
+                            if let proto::Message::Envelope(incoming) = incoming {
+                                tracing::trace!(%connection_id, "incoming rpc message: processing");
+                                futures::select_biased! {
+                                    result = incoming_tx.send(incoming).fuse() => match result {
+                                        Ok(_) => {
+                                            tracing::trace!(%connection_id, "incoming rpc message: processed");
+                                        }
+                                        Err(_) => {
+                                            tracing::trace!(%connection_id, "incoming rpc message: channel closed");
+                                            return Ok(())
+                                        }
+                                    },
+                                    _ = create_timer(WRITE_TIMEOUT).fuse() => {
+                                        tracing::trace!(%connection_id, "incoming rpc message: processing timed out");
+                                        Err(anyhow!("timed out processing incoming message"))?
+                                    }
+                                }
+                            }
+                            break;
+                        },
+                        _ = receive_timeout => {
+                            tracing::trace!(%connection_id, "receive timeout: delay between messages too long");
+                            Err(anyhow!("delay between messages too long"))?
+                        }
+                    }
+                }
+            }
+        };
+
+        let response_channels = connection_state.response_channels.clone();
+        self.connections
+            .write()
+            .insert(connection_id, connection_state);
+
+        let incoming_rx = incoming_rx.filter_map(move |incoming| {
+            let response_channels = response_channels.clone();
+            async move {
+                let message_id = incoming.id;
+                tracing::trace!(?incoming, "incoming message future: start");
+                let _end = util::defer(move || {
+                    tracing::trace!(%connection_id, message_id, "incoming message future: end");
+                });
+
+                if let Some(responding_to) = incoming.responding_to {
+                    tracing::trace!(
+                        %connection_id,
+                        message_id,
+                        responding_to,
+                        "incoming response: received"
+                    );
+                    let channel = response_channels.lock().as_mut()?.remove(&responding_to);
+                    if let Some(tx) = channel {
+                        let requester_resumed = oneshot::channel();
+                        if let Err(error) = tx.send((incoming, requester_resumed.0)) {
+                            tracing::trace!(
+                                %connection_id,
+                                message_id,
+                                responding_to = responding_to,
+                                ?error,
+                                "incoming response: request future dropped",
+                            );
+                        }
+
+                        tracing::trace!(
+                            %connection_id,
+                            message_id,
+                            responding_to,
+                            "incoming response: waiting to resume requester"
+                        );
+                        let _ = requester_resumed.1.await;
+                        tracing::trace!(
+                            %connection_id,
+                            message_id,
+                            responding_to,
+                            "incoming response: requester resumed"
+                        );
+                    } else {
+                        tracing::warn!(
+                            %connection_id,
+                            message_id,
+                            responding_to,
+                            "incoming response: unknown request"
+                        );
+                    }
+
+                    None
+                } else {
+                    tracing::trace!(%connection_id, message_id, "incoming message: received");
+                    proto::build_typed_envelope(connection_id, incoming).or_else(|| {
+                        tracing::error!(
+                            %connection_id,
+                            message_id,
+                            "unable to construct a typed envelope"
+                        );
+                        None
+                    })
+                }
+            }
+        });
+        (connection_id, handle_io, incoming_rx.boxed())
+    }
+
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn add_test_connection(
+        self: &Arc<Self>,
+        connection: Connection,
+        executor: gpui2::Executor,
+    ) -> (
+        ConnectionId,
+        impl Future<Output = anyhow::Result<()>> + Send,
+        BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
+    ) {
+        let executor = executor.clone();
+        self.add_connection(connection, move |duration| executor.timer(duration))
+    }
+
+    pub fn disconnect(&self, connection_id: ConnectionId) {
+        self.connections.write().remove(&connection_id);
+    }
+
+    pub fn reset(&self, epoch: u32) {
+        self.teardown();
+        self.next_connection_id.store(0, SeqCst);
+        self.epoch.store(epoch, SeqCst);
+    }
+
+    pub fn teardown(&self) {
+        self.connections.write().clear();
+    }
+
+    pub fn request<T: RequestMessage>(
+        &self,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = Result<T::Response>> {
+        self.request_internal(None, receiver_id, request)
+            .map_ok(|envelope| envelope.payload)
+    }
+
+    pub fn request_envelope<T: RequestMessage>(
+        &self,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
+        self.request_internal(None, receiver_id, request)
+    }
+
+    pub fn forward_request<T: RequestMessage>(
+        &self,
+        sender_id: ConnectionId,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = Result<T::Response>> {
+        self.request_internal(Some(sender_id), receiver_id, request)
+            .map_ok(|envelope| envelope.payload)
+    }
+
+    pub fn request_internal<T: RequestMessage>(
+        &self,
+        original_sender_id: Option<ConnectionId>,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = Result<TypedEnvelope<T::Response>>> {
+        let (tx, rx) = oneshot::channel();
+        let send = self.connection_state(receiver_id).and_then(|connection| {
+            let message_id = connection.next_message_id.fetch_add(1, SeqCst);
+            connection
+                .response_channels
+                .lock()
+                .as_mut()
+                .ok_or_else(|| anyhow!("connection was closed"))?
+                .insert(message_id, tx);
+            connection
+                .outgoing_tx
+                .unbounded_send(proto::Message::Envelope(request.into_envelope(
+                    message_id,
+                    None,
+                    original_sender_id.map(Into::into),
+                )))
+                .map_err(|_| anyhow!("connection was closed"))?;
+            Ok(())
+        });
+        async move {
+            send?;
+            let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?;
+
+            if let Some(proto::envelope::Payload::Error(error)) = &response.payload {
+                Err(anyhow!(
+                    "RPC request {} failed - {}",
+                    T::NAME,
+                    error.message
+                ))
+            } else {
+                Ok(TypedEnvelope {
+                    message_id: response.id,
+                    sender_id: receiver_id,
+                    original_sender_id: response.original_sender_id,
+                    payload: T::Response::from_envelope(response)
+                        .ok_or_else(|| anyhow!("received response of the wrong type"))?,
+                })
+            }
+        }
+    }
+
+    pub fn send<T: EnvelopedMessage>(&self, receiver_id: ConnectionId, message: T) -> Result<()> {
+        let connection = self.connection_state(receiver_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(proto::Message::Envelope(
+                message.into_envelope(message_id, None, None),
+            ))?;
+        Ok(())
+    }
+
+    pub fn forward_send<T: EnvelopedMessage>(
+        &self,
+        sender_id: ConnectionId,
+        receiver_id: ConnectionId,
+        message: T,
+    ) -> Result<()> {
+        let connection = self.connection_state(receiver_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(proto::Message::Envelope(message.into_envelope(
+                message_id,
+                None,
+                Some(sender_id.into()),
+            )))?;
+        Ok(())
+    }
+
+    pub fn respond<T: RequestMessage>(
+        &self,
+        receipt: Receipt<T>,
+        response: T::Response,
+    ) -> Result<()> {
+        let connection = self.connection_state(receipt.sender_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(proto::Message::Envelope(response.into_envelope(
+                message_id,
+                Some(receipt.message_id),
+                None,
+            )))?;
+        Ok(())
+    }
+
+    pub fn respond_with_error<T: RequestMessage>(
+        &self,
+        receipt: Receipt<T>,
+        response: proto::Error,
+    ) -> Result<()> {
+        let connection = self.connection_state(receipt.sender_id)?;
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(proto::Message::Envelope(response.into_envelope(
+                message_id,
+                Some(receipt.message_id),
+                None,
+            )))?;
+        Ok(())
+    }
+
+    pub fn respond_with_unhandled_message(
+        &self,
+        envelope: Box<dyn AnyTypedEnvelope>,
+    ) -> Result<()> {
+        let connection = self.connection_state(envelope.sender_id())?;
+        let response = proto::Error {
+            message: format!("message {} was not handled", envelope.payload_type_name()),
+        };
+        let message_id = connection
+            .next_message_id
+            .fetch_add(1, atomic::Ordering::SeqCst);
+        connection
+            .outgoing_tx
+            .unbounded_send(proto::Message::Envelope(response.into_envelope(
+                message_id,
+                Some(envelope.message_id()),
+                None,
+            )))?;
+        Ok(())
+    }
+
+    fn connection_state(&self, connection_id: ConnectionId) -> Result<ConnectionState> {
+        let connections = self.connections.read();
+        let connection = connections
+            .get(&connection_id)
+            .ok_or_else(|| anyhow!("no such connection: {}", connection_id))?;
+        Ok(connection.clone())
+    }
+}
+
+impl Serialize for Peer {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        let mut state = serializer.serialize_struct("Peer", 2)?;
+        state.serialize_field("connections", &*self.connections.read())?;
+        state.end()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::TypedEnvelope;
+    use async_tungstenite::tungstenite::Message as WebSocketMessage;
+    use gpui2::TestAppContext;
+
+    #[ctor::ctor]
+    fn init_logger() {
+        if std::env::var("RUST_LOG").is_ok() {
+            env_logger::init();
+        }
+    }
+
+    #[gpui2::test(iterations = 50)]
+    async fn test_request_response(cx: &mut TestAppContext) {
+        let executor = cx.executor();
+
+        // create 2 clients connected to 1 server
+        let server = Peer::new(0);
+        let client1 = Peer::new(0);
+        let client2 = Peer::new(0);
+
+        let (client1_to_server_conn, server_to_client_1_conn, _kill) =
+            Connection::in_memory(cx.executor().clone());
+        let (client1_conn_id, io_task1, client1_incoming) =
+            client1.add_test_connection(client1_to_server_conn, cx.executor().clone());
+        let (_, io_task2, server_incoming1) =
+            server.add_test_connection(server_to_client_1_conn, cx.executor().clone());
+
+        let (client2_to_server_conn, server_to_client_2_conn, _kill) =
+            Connection::in_memory(cx.executor().clone());
+        let (client2_conn_id, io_task3, client2_incoming) =
+            client2.add_test_connection(client2_to_server_conn, cx.executor().clone());
+        let (_, io_task4, server_incoming2) =
+            server.add_test_connection(server_to_client_2_conn, cx.executor().clone());
+
+        executor.spawn(io_task1).detach();
+        executor.spawn(io_task2).detach();
+        executor.spawn(io_task3).detach();
+        executor.spawn(io_task4).detach();
+        executor
+            .spawn(handle_messages(server_incoming1, server.clone()))
+            .detach();
+        executor
+            .spawn(handle_messages(client1_incoming, client1.clone()))
+            .detach();
+        executor
+            .spawn(handle_messages(server_incoming2, server.clone()))
+            .detach();
+        executor
+            .spawn(handle_messages(client2_incoming, client2.clone()))
+            .detach();
+
+        assert_eq!(
+            client1
+                .request(client1_conn_id, proto::Ping {},)
+                .await
+                .unwrap(),
+            proto::Ack {}
+        );
+
+        assert_eq!(
+            client2
+                .request(client2_conn_id, proto::Ping {},)
+                .await
+                .unwrap(),
+            proto::Ack {}
+        );
+
+        assert_eq!(
+            client1
+                .request(client1_conn_id, proto::Test { id: 1 },)
+                .await
+                .unwrap(),
+            proto::Test { id: 1 }
+        );
+
+        assert_eq!(
+            client2
+                .request(client2_conn_id, proto::Test { id: 2 })
+                .await
+                .unwrap(),
+            proto::Test { id: 2 }
+        );
+
+        client1.disconnect(client1_conn_id);
+        client2.disconnect(client1_conn_id);
+
+        async fn handle_messages(
+            mut messages: BoxStream<'static, Box<dyn AnyTypedEnvelope>>,
+            peer: Arc<Peer>,
+        ) -> Result<()> {
+            while let Some(envelope) = messages.next().await {
+                let envelope = envelope.into_any();
+                if let Some(envelope) = envelope.downcast_ref::<TypedEnvelope<proto::Ping>>() {
+                    let receipt = envelope.receipt();
+                    peer.respond(receipt, proto::Ack {})?
+                } else if let Some(envelope) = envelope.downcast_ref::<TypedEnvelope<proto::Test>>()
+                {
+                    peer.respond(envelope.receipt(), envelope.payload.clone())?
+                } else {
+                    panic!("unknown message type");
+                }
+            }
+
+            Ok(())
+        }
+    }
+
+    #[gpui2::test(iterations = 50)]
+    async fn test_order_of_response_and_incoming(cx: &mut TestAppContext) {
+        let executor = cx.executor();
+        let server = Peer::new(0);
+        let client = Peer::new(0);
+
+        let (client_to_server_conn, server_to_client_conn, _kill) =
+            Connection::in_memory(executor.clone());
+        let (client_to_server_conn_id, io_task1, mut client_incoming) =
+            client.add_test_connection(client_to_server_conn, executor.clone());
+
+        let (server_to_client_conn_id, io_task2, mut server_incoming) =
+            server.add_test_connection(server_to_client_conn, executor.clone());
+
+        executor.spawn(io_task1).detach();
+        executor.spawn(io_task2).detach();
+
+        executor
+            .spawn(async move {
+                let future = server_incoming.next().await;
+                let request = future
+                    .unwrap()
+                    .into_any()
+                    .downcast::<TypedEnvelope<proto::Ping>>()
+                    .unwrap();
+
+                server
+                    .send(
+                        server_to_client_conn_id,
+                        proto::Error {
+                            message: "message 1".to_string(),
+                        },
+                    )
+                    .unwrap();
+                server
+                    .send(
+                        server_to_client_conn_id,
+                        proto::Error {
+                            message: "message 2".to_string(),
+                        },
+                    )
+                    .unwrap();
+                server.respond(request.receipt(), proto::Ack {}).unwrap();
+
+                // Prevent the connection from being dropped
+                server_incoming.next().await;
+            })
+            .detach();
+
+        let events = Arc::new(Mutex::new(Vec::new()));
+
+        let response = client.request(client_to_server_conn_id, proto::Ping {});
+        let response_task = executor.spawn({
+            let events = events.clone();
+            async move {
+                response.await.unwrap();
+                events.lock().push("response".to_string());
+            }
+        });
+
+        executor
+            .spawn({
+                let events = events.clone();
+                async move {
+                    let incoming1 = client_incoming
+                        .next()
+                        .await
+                        .unwrap()
+                        .into_any()
+                        .downcast::<TypedEnvelope<proto::Error>>()
+                        .unwrap();
+                    events.lock().push(incoming1.payload.message);
+                    let incoming2 = client_incoming
+                        .next()
+                        .await
+                        .unwrap()
+                        .into_any()
+                        .downcast::<TypedEnvelope<proto::Error>>()
+                        .unwrap();
+                    events.lock().push(incoming2.payload.message);
+
+                    // Prevent the connection from being dropped
+                    client_incoming.next().await;
+                }
+            })
+            .detach();
+
+        response_task.await;
+        assert_eq!(
+            &*events.lock(),
+            &[
+                "message 1".to_string(),
+                "message 2".to_string(),
+                "response".to_string()
+            ]
+        );
+    }
+
+    #[gpui2::test(iterations = 50)]
+    async fn test_dropping_request_before_completion(cx: &mut TestAppContext) {
+        let executor = cx.executor().clone();
+        let server = Peer::new(0);
+        let client = Peer::new(0);
+
+        let (client_to_server_conn, server_to_client_conn, _kill) =
+            Connection::in_memory(cx.executor().clone());
+        let (client_to_server_conn_id, io_task1, mut client_incoming) =
+            client.add_test_connection(client_to_server_conn, cx.executor().clone());
+        let (server_to_client_conn_id, io_task2, mut server_incoming) =
+            server.add_test_connection(server_to_client_conn, cx.executor().clone());
+
+        executor.spawn(io_task1).detach();
+        executor.spawn(io_task2).detach();
+
+        executor
+            .spawn(async move {
+                let request1 = server_incoming
+                    .next()
+                    .await
+                    .unwrap()
+                    .into_any()
+                    .downcast::<TypedEnvelope<proto::Ping>>()
+                    .unwrap();
+                let request2 = server_incoming
+                    .next()
+                    .await
+                    .unwrap()
+                    .into_any()
+                    .downcast::<TypedEnvelope<proto::Ping>>()
+                    .unwrap();
+
+                server
+                    .send(
+                        server_to_client_conn_id,
+                        proto::Error {
+                            message: "message 1".to_string(),
+                        },
+                    )
+                    .unwrap();
+                server
+                    .send(
+                        server_to_client_conn_id,
+                        proto::Error {
+                            message: "message 2".to_string(),
+                        },
+                    )
+                    .unwrap();
+                server.respond(request1.receipt(), proto::Ack {}).unwrap();
+                server.respond(request2.receipt(), proto::Ack {}).unwrap();
+
+                // Prevent the connection from being dropped
+                server_incoming.next().await;
+            })
+            .detach();
+
+        let events = Arc::new(Mutex::new(Vec::new()));
+
+        let request1 = client.request(client_to_server_conn_id, proto::Ping {});
+        let request1_task = executor.spawn(request1);
+        let request2 = client.request(client_to_server_conn_id, proto::Ping {});
+        let request2_task = executor.spawn({
+            let events = events.clone();
+            async move {
+                request2.await.unwrap();
+                events.lock().push("response 2".to_string());
+            }
+        });
+
+        executor
+            .spawn({
+                let events = events.clone();
+                async move {
+                    let incoming1 = client_incoming
+                        .next()
+                        .await
+                        .unwrap()
+                        .into_any()
+                        .downcast::<TypedEnvelope<proto::Error>>()
+                        .unwrap();
+                    events.lock().push(incoming1.payload.message);
+                    let incoming2 = client_incoming
+                        .next()
+                        .await
+                        .unwrap()
+                        .into_any()
+                        .downcast::<TypedEnvelope<proto::Error>>()
+                        .unwrap();
+                    events.lock().push(incoming2.payload.message);
+
+                    // Prevent the connection from being dropped
+                    client_incoming.next().await;
+                }
+            })
+            .detach();
+
+        // Allow the request to make some progress before dropping it.
+        cx.executor().simulate_random_delay().await;
+        drop(request1_task);
+
+        request2_task.await;
+        assert_eq!(
+            &*events.lock(),
+            &[
+                "message 1".to_string(),
+                "message 2".to_string(),
+                "response 2".to_string()
+            ]
+        );
+    }
+
+    #[gpui2::test(iterations = 50)]
+    async fn test_disconnect(cx: &mut TestAppContext) {
+        let executor = cx.executor();
+
+        let (client_conn, mut server_conn, _kill) = Connection::in_memory(executor.clone());
+
+        let client = Peer::new(0);
+        let (connection_id, io_handler, mut incoming) =
+            client.add_test_connection(client_conn, executor.clone());
+
+        let (io_ended_tx, io_ended_rx) = oneshot::channel();
+        executor
+            .spawn(async move {
+                io_handler.await.ok();
+                io_ended_tx.send(()).unwrap();
+            })
+            .detach();
+
+        let (messages_ended_tx, messages_ended_rx) = oneshot::channel();
+        executor
+            .spawn(async move {
+                incoming.next().await;
+                messages_ended_tx.send(()).unwrap();
+            })
+            .detach();
+
+        client.disconnect(connection_id);
+
+        let _ = io_ended_rx.await;
+        let _ = messages_ended_rx.await;
+        assert!(server_conn
+            .send(WebSocketMessage::Binary(vec![]))
+            .await
+            .is_err());
+    }
+
+    #[gpui2::test(iterations = 50)]
+    async fn test_io_error(cx: &mut TestAppContext) {
+        let executor = cx.executor();
+        let (client_conn, mut server_conn, _kill) = Connection::in_memory(executor.clone());
+
+        let client = Peer::new(0);
+        let (connection_id, io_handler, mut incoming) =
+            client.add_test_connection(client_conn, executor.clone());
+        executor.spawn(io_handler).detach();
+        executor
+            .spawn(async move { incoming.next().await })
+            .detach();
+
+        let response = executor.spawn(client.request(connection_id, proto::Ping {}));
+        let _request = server_conn.rx.next().await.unwrap().unwrap();
+
+        drop(server_conn);
+        assert_eq!(
+            response.await.unwrap_err().to_string(),
+            "connection was closed"
+        );
+    }
+}

crates/rpc2/src/proto.rs 🔗

@@ -0,0 +1,674 @@
+#![allow(non_snake_case)]
+
+use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
+use anyhow::{anyhow, Result};
+use async_tungstenite::tungstenite::Message as WebSocketMessage;
+use collections::HashMap;
+use futures::{SinkExt as _, StreamExt as _};
+use prost::Message as _;
+use serde::Serialize;
+use std::any::{Any, TypeId};
+use std::{
+    cmp,
+    fmt::Debug,
+    io, iter,
+    time::{Duration, SystemTime, UNIX_EPOCH},
+};
+use std::{fmt, mem};
+
+include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
+
+pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 'static {
+    const NAME: &'static str;
+    const PRIORITY: MessagePriority;
+    fn into_envelope(
+        self,
+        id: u32,
+        responding_to: Option<u32>,
+        original_sender_id: Option<PeerId>,
+    ) -> Envelope;
+    fn from_envelope(envelope: Envelope) -> Option<Self>;
+}
+
+pub trait EntityMessage: EnvelopedMessage {
+    fn remote_entity_id(&self) -> u64;
+}
+
+pub trait RequestMessage: EnvelopedMessage {
+    type Response: EnvelopedMessage;
+}
+
+pub trait AnyTypedEnvelope: 'static + Send + Sync {
+    fn payload_type_id(&self) -> TypeId;
+    fn payload_type_name(&self) -> &'static str;
+    fn as_any(&self) -> &dyn Any;
+    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
+    fn is_background(&self) -> bool;
+    fn original_sender_id(&self) -> Option<PeerId>;
+    fn sender_id(&self) -> ConnectionId;
+    fn message_id(&self) -> u32;
+}
+
+pub enum MessagePriority {
+    Foreground,
+    Background,
+}
+
+impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
+    fn payload_type_id(&self) -> TypeId {
+        TypeId::of::<T>()
+    }
+
+    fn payload_type_name(&self) -> &'static str {
+        T::NAME
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
+        self
+    }
+
+    fn is_background(&self) -> bool {
+        matches!(T::PRIORITY, MessagePriority::Background)
+    }
+
+    fn original_sender_id(&self) -> Option<PeerId> {
+        self.original_sender_id
+    }
+
+    fn sender_id(&self) -> ConnectionId {
+        self.sender_id
+    }
+
+    fn message_id(&self) -> u32 {
+        self.message_id
+    }
+}
+
+impl PeerId {
+    pub fn from_u64(peer_id: u64) -> Self {
+        let owner_id = (peer_id >> 32) as u32;
+        let id = peer_id as u32;
+        Self { owner_id, id }
+    }
+
+    pub fn as_u64(self) -> u64 {
+        ((self.owner_id as u64) << 32) | (self.id as u64)
+    }
+}
+
+impl Copy for PeerId {}
+
+impl Eq for PeerId {}
+
+impl Ord for PeerId {
+    fn cmp(&self, other: &Self) -> cmp::Ordering {
+        self.owner_id
+            .cmp(&other.owner_id)
+            .then_with(|| self.id.cmp(&other.id))
+    }
+}
+
+impl PartialOrd for PeerId {
+    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl std::hash::Hash for PeerId {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.owner_id.hash(state);
+        self.id.hash(state);
+    }
+}
+
+impl fmt::Display for PeerId {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{}/{}", self.owner_id, self.id)
+    }
+}
+
+messages!(
+    (Ack, Foreground),
+    (AddProjectCollaborator, Foreground),
+    (ApplyCodeAction, Background),
+    (ApplyCodeActionResponse, Background),
+    (ApplyCompletionAdditionalEdits, Background),
+    (ApplyCompletionAdditionalEditsResponse, Background),
+    (BufferReloaded, Foreground),
+    (BufferSaved, Foreground),
+    (Call, Foreground),
+    (CallCanceled, Foreground),
+    (CancelCall, Foreground),
+    (CopyProjectEntry, Foreground),
+    (CreateBufferForPeer, Foreground),
+    (CreateChannel, Foreground),
+    (CreateChannelResponse, Foreground),
+    (ChannelMessageSent, Foreground),
+    (CreateProjectEntry, Foreground),
+    (CreateRoom, Foreground),
+    (CreateRoomResponse, Foreground),
+    (DeclineCall, Foreground),
+    (DeleteProjectEntry, Foreground),
+    (Error, Foreground),
+    (ExpandProjectEntry, Foreground),
+    (Follow, Foreground),
+    (FollowResponse, Foreground),
+    (FormatBuffers, Foreground),
+    (FormatBuffersResponse, Foreground),
+    (FuzzySearchUsers, Foreground),
+    (GetCodeActions, Background),
+    (GetCodeActionsResponse, Background),
+    (GetHover, Background),
+    (GetHoverResponse, Background),
+    (GetChannelMessages, Background),
+    (GetChannelMessagesResponse, Background),
+    (SendChannelMessage, Background),
+    (SendChannelMessageResponse, Background),
+    (GetCompletions, Background),
+    (GetCompletionsResponse, Background),
+    (GetDefinition, Background),
+    (GetDefinitionResponse, Background),
+    (GetTypeDefinition, Background),
+    (GetTypeDefinitionResponse, Background),
+    (GetDocumentHighlights, Background),
+    (GetDocumentHighlightsResponse, Background),
+    (GetReferences, Background),
+    (GetReferencesResponse, Background),
+    (GetProjectSymbols, Background),
+    (GetProjectSymbolsResponse, Background),
+    (GetUsers, Foreground),
+    (Hello, Foreground),
+    (IncomingCall, Foreground),
+    (InviteChannelMember, Foreground),
+    (UsersResponse, Foreground),
+    (JoinProject, Foreground),
+    (JoinProjectResponse, Foreground),
+    (JoinRoom, Foreground),
+    (JoinRoomResponse, Foreground),
+    (JoinChannelChat, Foreground),
+    (JoinChannelChatResponse, Foreground),
+    (LeaveChannelChat, Foreground),
+    (LeaveProject, Foreground),
+    (LeaveRoom, Foreground),
+    (OpenBufferById, Background),
+    (OpenBufferByPath, Background),
+    (OpenBufferForSymbol, Background),
+    (OpenBufferForSymbolResponse, Background),
+    (OpenBufferResponse, Background),
+    (PerformRename, Background),
+    (PerformRenameResponse, Background),
+    (OnTypeFormatting, Background),
+    (OnTypeFormattingResponse, Background),
+    (InlayHints, Background),
+    (InlayHintsResponse, Background),
+    (ResolveInlayHint, Background),
+    (ResolveInlayHintResponse, Background),
+    (RefreshInlayHints, Foreground),
+    (Ping, Foreground),
+    (PrepareRename, Background),
+    (PrepareRenameResponse, Background),
+    (ExpandProjectEntryResponse, Foreground),
+    (ProjectEntryResponse, Foreground),
+    (RejoinRoom, Foreground),
+    (RejoinRoomResponse, Foreground),
+    (RemoveContact, Foreground),
+    (RemoveChannelMember, Foreground),
+    (RemoveChannelMessage, Foreground),
+    (ReloadBuffers, Foreground),
+    (ReloadBuffersResponse, Foreground),
+    (RemoveProjectCollaborator, Foreground),
+    (RenameProjectEntry, Foreground),
+    (RequestContact, Foreground),
+    (RespondToContactRequest, Foreground),
+    (RespondToChannelInvite, Foreground),
+    (JoinChannel, Foreground),
+    (RoomUpdated, Foreground),
+    (SaveBuffer, Foreground),
+    (RenameChannel, Foreground),
+    (RenameChannelResponse, Foreground),
+    (SetChannelMemberAdmin, Foreground),
+    (SearchProject, Background),
+    (SearchProjectResponse, Background),
+    (ShareProject, Foreground),
+    (ShareProjectResponse, Foreground),
+    (ShowContacts, Foreground),
+    (StartLanguageServer, Foreground),
+    (SynchronizeBuffers, Foreground),
+    (SynchronizeBuffersResponse, Foreground),
+    (RejoinChannelBuffers, Foreground),
+    (RejoinChannelBuffersResponse, Foreground),
+    (Test, Foreground),
+    (Unfollow, Foreground),
+    (UnshareProject, Foreground),
+    (UpdateBuffer, Foreground),
+    (UpdateBufferFile, Foreground),
+    (UpdateContacts, Foreground),
+    (DeleteChannel, Foreground),
+    (MoveChannel, Foreground),
+    (LinkChannel, Foreground),
+    (UnlinkChannel, Foreground),
+    (UpdateChannels, Foreground),
+    (UpdateDiagnosticSummary, Foreground),
+    (UpdateFollowers, Foreground),
+    (UpdateInviteInfo, Foreground),
+    (UpdateLanguageServer, Foreground),
+    (UpdateParticipantLocation, Foreground),
+    (UpdateProject, Foreground),
+    (UpdateProjectCollaborator, Foreground),
+    (UpdateWorktree, Foreground),
+    (UpdateWorktreeSettings, Foreground),
+    (UpdateDiffBase, Foreground),
+    (GetPrivateUserInfo, Foreground),
+    (GetPrivateUserInfoResponse, Foreground),
+    (GetChannelMembers, Foreground),
+    (GetChannelMembersResponse, Foreground),
+    (JoinChannelBuffer, Foreground),
+    (JoinChannelBufferResponse, Foreground),
+    (LeaveChannelBuffer, Background),
+    (UpdateChannelBuffer, Foreground),
+    (UpdateChannelBufferCollaborators, Foreground),
+    (AckBufferOperation, Background),
+    (AckChannelMessage, Background),
+);
+
+request_messages!(
+    (ApplyCodeAction, ApplyCodeActionResponse),
+    (
+        ApplyCompletionAdditionalEdits,
+        ApplyCompletionAdditionalEditsResponse
+    ),
+    (Call, Ack),
+    (CancelCall, Ack),
+    (CopyProjectEntry, ProjectEntryResponse),
+    (CreateProjectEntry, ProjectEntryResponse),
+    (CreateRoom, CreateRoomResponse),
+    (CreateChannel, CreateChannelResponse),
+    (DeclineCall, Ack),
+    (DeleteProjectEntry, ProjectEntryResponse),
+    (ExpandProjectEntry, ExpandProjectEntryResponse),
+    (Follow, FollowResponse),
+    (FormatBuffers, FormatBuffersResponse),
+    (GetCodeActions, GetCodeActionsResponse),
+    (GetHover, GetHoverResponse),
+    (GetCompletions, GetCompletionsResponse),
+    (GetDefinition, GetDefinitionResponse),
+    (GetTypeDefinition, GetTypeDefinitionResponse),
+    (GetDocumentHighlights, GetDocumentHighlightsResponse),
+    (GetReferences, GetReferencesResponse),
+    (GetPrivateUserInfo, GetPrivateUserInfoResponse),
+    (GetProjectSymbols, GetProjectSymbolsResponse),
+    (FuzzySearchUsers, UsersResponse),
+    (GetUsers, UsersResponse),
+    (InviteChannelMember, Ack),
+    (JoinProject, JoinProjectResponse),
+    (JoinRoom, JoinRoomResponse),
+    (JoinChannelChat, JoinChannelChatResponse),
+    (LeaveRoom, Ack),
+    (RejoinRoom, RejoinRoomResponse),
+    (IncomingCall, Ack),
+    (OpenBufferById, OpenBufferResponse),
+    (OpenBufferByPath, OpenBufferResponse),
+    (OpenBufferForSymbol, OpenBufferForSymbolResponse),
+    (Ping, Ack),
+    (PerformRename, PerformRenameResponse),
+    (PrepareRename, PrepareRenameResponse),
+    (OnTypeFormatting, OnTypeFormattingResponse),
+    (InlayHints, InlayHintsResponse),
+    (ResolveInlayHint, ResolveInlayHintResponse),
+    (RefreshInlayHints, Ack),
+    (ReloadBuffers, ReloadBuffersResponse),
+    (RequestContact, Ack),
+    (RemoveChannelMember, Ack),
+    (RemoveContact, Ack),
+    (RespondToContactRequest, Ack),
+    (RespondToChannelInvite, Ack),
+    (SetChannelMemberAdmin, Ack),
+    (SendChannelMessage, SendChannelMessageResponse),
+    (GetChannelMessages, GetChannelMessagesResponse),
+    (GetChannelMembers, GetChannelMembersResponse),
+    (JoinChannel, JoinRoomResponse),
+    (RemoveChannelMessage, Ack),
+    (DeleteChannel, Ack),
+    (RenameProjectEntry, ProjectEntryResponse),
+    (RenameChannel, RenameChannelResponse),
+    (LinkChannel, Ack),
+    (UnlinkChannel, Ack),
+    (MoveChannel, Ack),
+    (SaveBuffer, BufferSaved),
+    (SearchProject, SearchProjectResponse),
+    (ShareProject, ShareProjectResponse),
+    (SynchronizeBuffers, SynchronizeBuffersResponse),
+    (RejoinChannelBuffers, RejoinChannelBuffersResponse),
+    (Test, Test),
+    (UpdateBuffer, Ack),
+    (UpdateParticipantLocation, Ack),
+    (UpdateProject, Ack),
+    (UpdateWorktree, Ack),
+    (JoinChannelBuffer, JoinChannelBufferResponse),
+    (LeaveChannelBuffer, Ack)
+);
+
+entity_messages!(
+    project_id,
+    AddProjectCollaborator,
+    ApplyCodeAction,
+    ApplyCompletionAdditionalEdits,
+    BufferReloaded,
+    BufferSaved,
+    CopyProjectEntry,
+    CreateBufferForPeer,
+    CreateProjectEntry,
+    DeleteProjectEntry,
+    ExpandProjectEntry,
+    FormatBuffers,
+    GetCodeActions,
+    GetCompletions,
+    GetDefinition,
+    GetTypeDefinition,
+    GetDocumentHighlights,
+    GetHover,
+    GetReferences,
+    GetProjectSymbols,
+    JoinProject,
+    LeaveProject,
+    OpenBufferById,
+    OpenBufferByPath,
+    OpenBufferForSymbol,
+    PerformRename,
+    OnTypeFormatting,
+    InlayHints,
+    ResolveInlayHint,
+    RefreshInlayHints,
+    PrepareRename,
+    ReloadBuffers,
+    RemoveProjectCollaborator,
+    RenameProjectEntry,
+    SaveBuffer,
+    SearchProject,
+    StartLanguageServer,
+    SynchronizeBuffers,
+    UnshareProject,
+    UpdateBuffer,
+    UpdateBufferFile,
+    UpdateDiagnosticSummary,
+    UpdateLanguageServer,
+    UpdateProject,
+    UpdateProjectCollaborator,
+    UpdateWorktree,
+    UpdateWorktreeSettings,
+    UpdateDiffBase
+);
+
+entity_messages!(
+    channel_id,
+    ChannelMessageSent,
+    UpdateChannelBuffer,
+    RemoveChannelMessage,
+    UpdateChannelBufferCollaborators,
+);
+
+const KIB: usize = 1024;
+const MIB: usize = KIB * 1024;
+const MAX_BUFFER_LEN: usize = MIB;
+
+/// A stream of protobuf messages.
+pub struct MessageStream<S> {
+    stream: S,
+    encoding_buffer: Vec<u8>,
+}
+
+#[allow(clippy::large_enum_variant)]
+#[derive(Debug)]
+pub enum Message {
+    Envelope(Envelope),
+    Ping,
+    Pong,
+}
+
+impl<S> MessageStream<S> {
+    pub fn new(stream: S) -> Self {
+        Self {
+            stream,
+            encoding_buffer: Vec::new(),
+        }
+    }
+
+    pub fn inner_mut(&mut self) -> &mut S {
+        &mut self.stream
+    }
+}
+
+impl<S> MessageStream<S>
+where
+    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
+{
+    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
+        #[cfg(any(test, feature = "test-support"))]
+        const COMPRESSION_LEVEL: i32 = -7;
+
+        #[cfg(not(any(test, feature = "test-support")))]
+        const COMPRESSION_LEVEL: i32 = 4;
+
+        match message {
+            Message::Envelope(message) => {
+                self.encoding_buffer.reserve(message.encoded_len());
+                message
+                    .encode(&mut self.encoding_buffer)
+                    .map_err(io::Error::from)?;
+                let buffer =
+                    zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
+                        .unwrap();
+
+                self.encoding_buffer.clear();
+                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
+                self.stream.send(WebSocketMessage::Binary(buffer)).await?;
+            }
+            Message::Ping => {
+                self.stream
+                    .send(WebSocketMessage::Ping(Default::default()))
+                    .await?;
+            }
+            Message::Pong => {
+                self.stream
+                    .send(WebSocketMessage::Pong(Default::default()))
+                    .await?;
+            }
+        }
+
+        Ok(())
+    }
+}
+
+impl<S> MessageStream<S>
+where
+    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
+{
+    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
+        while let Some(bytes) = self.stream.next().await {
+            match bytes? {
+                WebSocketMessage::Binary(bytes) => {
+                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
+                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
+                        .map_err(io::Error::from)?;
+
+                    self.encoding_buffer.clear();
+                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
+                    return Ok(Message::Envelope(envelope));
+                }
+                WebSocketMessage::Ping(_) => return Ok(Message::Ping),
+                WebSocketMessage::Pong(_) => return Ok(Message::Pong),
+                WebSocketMessage::Close(_) => break,
+                _ => {}
+            }
+        }
+        Err(anyhow!("connection closed"))
+    }
+}
+
+impl From<Timestamp> for SystemTime {
+    fn from(val: Timestamp) -> Self {
+        UNIX_EPOCH
+            .checked_add(Duration::new(val.seconds, val.nanos))
+            .unwrap()
+    }
+}
+
+impl From<SystemTime> for Timestamp {
+    fn from(time: SystemTime) -> Self {
+        let duration = time.duration_since(UNIX_EPOCH).unwrap();
+        Self {
+            seconds: duration.as_secs(),
+            nanos: duration.subsec_nanos(),
+        }
+    }
+}
+
+impl From<u128> for Nonce {
+    fn from(nonce: u128) -> Self {
+        let upper_half = (nonce >> 64) as u64;
+        let lower_half = nonce as u64;
+        Self {
+            upper_half,
+            lower_half,
+        }
+    }
+}
+
+impl From<Nonce> for u128 {
+    fn from(nonce: Nonce) -> Self {
+        let upper_half = (nonce.upper_half as u128) << 64;
+        let lower_half = nonce.lower_half as u128;
+        upper_half | lower_half
+    }
+}
+
+pub fn split_worktree_update(
+    mut message: UpdateWorktree,
+    max_chunk_size: usize,
+) -> impl Iterator<Item = UpdateWorktree> {
+    let mut done_files = false;
+
+    let mut repository_map = message
+        .updated_repositories
+        .into_iter()
+        .map(|repo| (repo.work_directory_id, repo))
+        .collect::<HashMap<_, _>>();
+
+    iter::from_fn(move || {
+        if done_files {
+            return None;
+        }
+
+        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
+        let updated_entries: Vec<_> = message
+            .updated_entries
+            .drain(..updated_entries_chunk_size)
+            .collect();
+
+        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
+        let removed_entries = message
+            .removed_entries
+            .drain(..removed_entries_chunk_size)
+            .collect();
+
+        done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
+
+        let mut updated_repositories = Vec::new();
+
+        if !repository_map.is_empty() {
+            for entry in &updated_entries {
+                if let Some(repo) = repository_map.remove(&entry.id) {
+                    updated_repositories.push(repo)
+                }
+            }
+        }
+
+        let removed_repositories = if done_files {
+            mem::take(&mut message.removed_repositories)
+        } else {
+            Default::default()
+        };
+
+        if done_files {
+            updated_repositories.extend(mem::take(&mut repository_map).into_values());
+        }
+
+        Some(UpdateWorktree {
+            project_id: message.project_id,
+            worktree_id: message.worktree_id,
+            root_name: message.root_name.clone(),
+            abs_path: message.abs_path.clone(),
+            updated_entries,
+            removed_entries,
+            scan_id: message.scan_id,
+            is_last_update: done_files && message.is_last_update,
+            updated_repositories,
+            removed_repositories,
+        })
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[gpui2::test]
+    async fn test_buffer_size() {
+        let (tx, rx) = futures::channel::mpsc::unbounded();
+        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
+        sink.write(Message::Envelope(Envelope {
+            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
+                root_name: "abcdefg".repeat(10),
+                ..Default::default()
+            })),
+            ..Default::default()
+        }))
+        .await
+        .unwrap();
+        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+        sink.write(Message::Envelope(Envelope {
+            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
+                root_name: "abcdefg".repeat(1000000),
+                ..Default::default()
+            })),
+            ..Default::default()
+        }))
+        .await
+        .unwrap();
+        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+
+        let mut stream = MessageStream::new(rx.map(anyhow::Ok));
+        stream.read().await.unwrap();
+        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+        stream.read().await.unwrap();
+        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+    }
+
+    #[gpui2::test]
+    fn test_converting_peer_id_from_and_to_u64() {
+        let peer_id = PeerId {
+            owner_id: 10,
+            id: 3,
+        };
+        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+        let peer_id = PeerId {
+            owner_id: u32::MAX,
+            id: 3,
+        };
+        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+        let peer_id = PeerId {
+            owner_id: 10,
+            id: u32::MAX,
+        };
+        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+        let peer_id = PeerId {
+            owner_id: u32::MAX,
+            id: u32::MAX,
+        };
+        assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id);
+    }
+}

crates/rpc2/src/rpc.rs 🔗

@@ -0,0 +1,9 @@
+pub mod auth;
+mod conn;
+mod peer;
+pub mod proto;
+pub use conn::Connection;
+pub use peer::*;
+mod macros;
+
+pub const PROTOCOL_VERSION: u32 = 64;

crates/zed2/Cargo.toml 🔗

@@ -57,7 +57,7 @@ project2 = { path = "../project2" }
 # project_symbols = { path = "../project_symbols" }
 # quick_action_bar = { path = "../quick_action_bar" }
 # recent_projects = { path = "../recent_projects" }
-rpc = { path = "../rpc" }
+rpc2 = { path = "../rpc2" }
 settings2 = { path = "../settings2" }
 feature_flags = { path = "../feature_flags" }
 sum_tree = { path = "../sum_tree" }