acp: Use new Rust SDK (#52997)

Ben Brandt created

Testing out Niko's new SDK design

Self-Review Checklist:

- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [x] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable


Release Notes:

- N/A

Change summary

Cargo.lock                                            | 342 ++++-
Cargo.toml                                            |   2 
crates/acp_thread/src/acp_thread.rs                   |   2 
crates/acp_thread/src/connection.rs                   |   4 
crates/acp_thread/src/mention.rs                      |   2 
crates/acp_thread/src/terminal.rs                     |   2 
crates/acp_tools/Cargo.toml                           |   5 
crates/acp_tools/src/acp_tools.rs                     | 414 ++++++-
crates/agent/src/agent.rs                             |   2 
crates/agent/src/db.rs                                |   2 
crates/agent/src/native_agent_server.rs               |   2 
crates/agent/src/tests/mod.rs                         |   4 
crates/agent/src/thread.rs                            |   4 
crates/agent/src/thread_store.rs                      |   2 
crates/agent/src/tools/context_server_registry.rs     |   6 
crates/agent/src/tools/copy_path_tool.rs              |   7 
crates/agent/src/tools/create_directory_tool.rs       |   7 
crates/agent/src/tools/delete_path_tool.rs            |   7 
crates/agent/src/tools/diagnostics_tool.rs            |   2 
crates/agent/src/tools/edit_file_tool.rs              |   6 
crates/agent/src/tools/fetch_tool.rs                  |   2 
crates/agent/src/tools/find_path_tool.rs              |   2 
crates/agent/src/tools/grep_tool.rs                   |   2 
crates/agent/src/tools/list_directory_tool.rs         |   7 
crates/agent/src/tools/move_path_tool.rs              |   7 
crates/agent/src/tools/now_tool.rs                    |   2 
crates/agent/src/tools/open_tool.rs                   |   6 
crates/agent/src/tools/read_file_tool.rs              |   9 
crates/agent/src/tools/restore_file_from_disk_tool.rs |   2 
crates/agent/src/tools/save_file_tool.rs              |   2 
crates/agent/src/tools/spawn_agent_tool.rs            |   2 
crates/agent/src/tools/streaming_edit_file_tool.rs    |   2 
crates/agent/src/tools/terminal_tool.rs               |   2 
crates/agent/src/tools/update_plan_tool.rs            |   2 
crates/agent/src/tools/web_search_tool.rs             |   2 
crates/agent_servers/Cargo.toml                       |   5 
crates/agent_servers/src/acp.rs                       | 744 +++++++++---
crates/agent_servers/src/agent_servers.rs             |  21 
crates/agent_servers/src/custom.rs                    |   2 
crates/agent_servers/src/e2e_tests.rs                 |   4 
crates/agent_settings/src/agent_settings.rs           |   6 
crates/agent_ui/src/agent_panel.rs                    |   2 
crates/agent_ui/src/agent_ui.rs                       |   6 
crates/agent_ui/src/completion_provider.rs            |   2 
crates/agent_ui/src/config_options.rs                 |   2 
crates/agent_ui/src/conversation_view.rs              |  23 
crates/agent_ui/src/conversation_view/thread_view.rs  |  10 
crates/agent_ui/src/entry_view_state.rs               |  10 
crates/agent_ui/src/mention_set.rs                    |   2 
crates/agent_ui/src/message_editor.rs                 |   4 
crates/agent_ui/src/mode_selector.rs                  |   2 
crates/agent_ui/src/model_selector.rs                 |  13 
crates/agent_ui/src/test_support.rs                   |   2 
crates/agent_ui/src/thread_import.rs                  |   2 
crates/agent_ui/src/thread_metadata_store.rs          |   5 
crates/agent_ui/src/threads_archive_view.rs           |   2 
crates/agent_ui/src/ui/mention_crease.rs              |   2 
crates/eval_cli/src/main.rs                           |   2 
crates/sidebar/src/sidebar.rs                         |   2 
crates/sidebar/src/thread_switcher.rs                 |   2 
crates/zed/src/main.rs                                |   4 
crates/zed/src/visual_test_runner.rs                  |   2 
62 files changed, 1,283 insertions(+), 475 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -55,11 +55,13 @@ dependencies = [
  "collections",
  "gpui",
  "language",
+ "log",
  "markdown",
  "project",
  "serde",
  "serde_json",
  "settings",
+ "smol",
  "theme_settings",
  "ui",
  "util",
@@ -190,7 +192,7 @@ dependencies = [
  "regex",
  "reqwest_client",
  "rust-embed",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -220,33 +222,53 @@ dependencies = [
 
 [[package]]
 name = "agent-client-protocol"
-version = "0.10.2"
+version = "0.11.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c56a59cf6315e99f874d2c1f96c69d2da5ffe0087d211297fc4a41f849770a2"
+checksum = "2af62fb84df2af0f933d8f5fd78b843fa5eb0ec5a48fa1b528c41951d0bbe36c"
 dependencies = [
+ "agent-client-protocol-derive",
  "agent-client-protocol-schema",
  "anyhow",
- "async-broadcast",
- "async-trait",
- "derive_more",
  "futures 0.3.32",
- "log",
+ "futures-concurrency",
+ "jsonrpcmsg",
+ "rmcp",
+ "rustc-hash 2.1.1",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
+ "thiserror 2.0.17",
+ "tokio",
+ "tokio-util",
+ "tracing",
+ "uuid",
+]
+
+[[package]]
+name = "agent-client-protocol-derive"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ce42c2d3c048c12897eef2e577dfff1e3355c632c9f1625cc953b9df48b44631"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
 ]
 
 [[package]]
 name = "agent-client-protocol-schema"
-version = "0.11.2"
+version = "0.12.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e0497b9a95a404e35799904835c57c6f8c69b9d08ccfd3cb5b7d746425cd6789"
+checksum = "49bae57dad1c28a362fbdcf7bab0583316a02b45a70792109fced55780a3b63c"
 dependencies = [
  "anyhow",
  "derive_more",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
+ "serde_with",
  "strum 0.28.0",
+ "tracing",
 ]
 
 [[package]]
@@ -258,8 +280,6 @@ dependencies = [
  "action_log",
  "agent-client-protocol",
  "anyhow",
- "async-pipe",
- "async-trait",
  "chrono",
  "client",
  "collections",
@@ -311,7 +331,7 @@ dependencies = [
  "paths",
  "project",
  "regex",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -391,7 +411,7 @@ dependencies = [
  "reqwest_client",
  "rope",
  "rules_library",
- "schemars",
+ "schemars 1.0.4",
  "search",
  "semver",
  "serde",
@@ -656,7 +676,7 @@ dependencies = [
  "http_client",
  "language_model_core",
  "log",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "strum 0.27.2",
@@ -1304,7 +1324,7 @@ dependencies = [
  "log",
  "num-rational",
  "num-traits",
- "pastey",
+ "pastey 0.1.1",
  "rayon",
  "thiserror 2.0.17",
  "v_frame",
@@ -1932,7 +1952,7 @@ dependencies = [
  "aws-sdk-bedrockruntime",
  "aws-smithy-types",
  "futures 0.3.32",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "strum 0.27.2",
@@ -2144,7 +2164,7 @@ version = "3.8.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "89ec27229c38ed0eb3c0feee3d2c1d6a4379ae44f418a29a658890e062d8f365"
 dependencies = [
- "darling 0.21.3",
+ "darling 0.23.0",
  "ident_case",
  "prettyplease",
  "proc-macro2",
@@ -2672,7 +2692,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "eadd868a2ce9ca38de7eeafdcec9c7065ef89b42b32f0839278d55f35c54d1ff"
 dependencies = [
  "heck 0.4.1",
- "indexmap",
+ "indexmap 2.11.4",
  "log",
  "proc-macro2",
  "quote",
@@ -3292,7 +3312,7 @@ dependencies = [
 name = "collections"
 version = "0.1.0"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "rustc-hash 2.1.1",
 ]
 
@@ -3548,7 +3568,7 @@ dependencies = [
  "parking_lot",
  "postage",
  "rand 0.9.3",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -4377,7 +4397,7 @@ checksum = "d74b6bcf49ebbd91f1b1875b706ea46545032a14003b5557b7dfa4bbeba6766e"
 dependencies = [
  "cc",
  "codespan-reporting",
- "indexmap",
+ "indexmap 2.11.4",
  "proc-macro2",
  "quote",
  "scratch",
@@ -4392,7 +4412,7 @@ checksum = "94ca2ad69673c4b35585edfa379617ac364bccd0ba0adf319811ba3a74ffa48a"
 dependencies = [
  "clap",
  "codespan-reporting",
- "indexmap",
+ "indexmap 2.11.4",
  "proc-macro2",
  "quote",
  "syn 2.0.117",
@@ -4410,7 +4430,7 @@ version = "1.0.187"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2a8ebf0b6138325af3ec73324cb3a48b64d57721f17291b151206782e61f66cd"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "proc-macro2",
  "quote",
  "syn 2.0.117",
@@ -4439,7 +4459,7 @@ dependencies = [
  "parking_lot",
  "paths",
  "proto",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -4456,7 +4476,7 @@ name = "dap-types"
 version = "0.0.1"
 source = "git+https://github.com/zed-industries/dap-types?rev=1b461b310481d01e02b2603c16d7144b926339f8#1b461b310481d01e02b2603c16d7144b926339f8"
 dependencies = [
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
 ]
@@ -4507,6 +4527,16 @@ dependencies = [
  "darling_macro 0.21.3",
 ]
 
+[[package]]
+name = "darling"
+version = "0.23.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d"
+dependencies = [
+ "darling_core 0.23.0",
+ "darling_macro 0.23.0",
+]
+
 [[package]]
 name = "darling_core"
 version = "0.20.11"
@@ -4535,6 +4565,19 @@ dependencies = [
  "syn 2.0.117",
 ]
 
+[[package]]
+name = "darling_core"
+version = "0.23.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9865a50f7c335f53564bb694ef660825eb8610e0a53d3e11bf1b0d3df31e03b0"
+dependencies = [
+ "ident_case",
+ "proc-macro2",
+ "quote",
+ "strsim",
+ "syn 2.0.117",
+]
+
 [[package]]
 name = "darling_macro"
 version = "0.20.11"
@@ -4557,6 +4600,17 @@ dependencies = [
  "syn 2.0.117",
 ]
 
+[[package]]
+name = "darling_macro"
+version = "0.23.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d"
+dependencies = [
+ "darling_core 0.23.0",
+ "quote",
+ "syn 2.0.117",
+]
+
 [[package]]
 name = "dashmap"
 version = "6.1.0"
@@ -4686,7 +4740,7 @@ dependencies = [
  "pretty_assertions",
  "project",
  "rpc",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -4726,7 +4780,7 @@ dependencies = [
  "anyhow",
  "futures 0.3.32",
  "http_client",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
 ]
@@ -5005,7 +5059,7 @@ dependencies = [
  "jsonschema",
  "mdbook",
  "regex",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -5441,7 +5495,7 @@ dependencies = [
  "release_channel",
  "rope",
  "rpc",
- "schemars",
+ "schemars 1.0.4",
  "semver",
  "serde",
  "serde_json",
@@ -6185,7 +6239,7 @@ dependencies = [
  "fs",
  "gpui",
  "inventory",
- "schemars",
+ "schemars 1.0.4",
  "serde_json",
  "settings",
 ]
@@ -7138,7 +7192,7 @@ dependencies = [
  "derive_more",
  "derive_setters",
  "gh-workflow-macros",
- "indexmap",
+ "indexmap 2.11.4",
  "merge",
  "serde",
  "serde_json",
@@ -7183,7 +7237,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7"
 dependencies = [
  "fallible-iterator",
- "indexmap",
+ "indexmap 2.11.4",
  "stable_deref_trait",
 ]
 
@@ -7220,7 +7274,7 @@ dependencies = [
  "rand 0.9.3",
  "regex",
  "rope",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "smallvec",
@@ -7342,7 +7396,7 @@ dependencies = [
  "rand 0.9.3",
  "remote",
  "remote_connection",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -7528,7 +7582,7 @@ dependencies = [
  "http_client",
  "language_model_core",
  "log",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "strum 0.27.2",
@@ -7633,7 +7687,7 @@ dependencies = [
  "reqwest_client",
  "resvg",
  "scheduler",
- "schemars",
+ "schemars 1.0.4",
  "seahash",
  "serde",
  "serde_json",
@@ -7778,7 +7832,7 @@ version = "0.1.0"
 dependencies = [
  "derive_more",
  "gpui_util",
- "schemars",
+ "schemars 1.0.4",
  "serde",
 ]
 
@@ -7933,7 +7987,7 @@ dependencies = [
  "futures-sink",
  "futures-util",
  "http 0.2.12",
- "indexmap",
+ "indexmap 2.11.4",
  "slab",
  "tokio",
  "tokio-util",
@@ -7952,7 +8006,7 @@ dependencies = [
  "futures-core",
  "futures-sink",
  "http 1.3.1",
- "indexmap",
+ "indexmap 2.11.4",
  "slab",
  "tokio",
  "tokio-util",
@@ -8779,6 +8833,17 @@ version = "0.3.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "964de6e86d545b246d84badc0fef527924ace5134f30641c203ef52ba83f58d5"
 
+[[package]]
+name = "indexmap"
+version = "1.9.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
+dependencies = [
+ "autocfg",
+ "hashbrown 0.12.3",
+ "serde",
+]
+
 [[package]]
 name = "indexmap"
 version = "2.11.4"
@@ -9189,7 +9254,7 @@ dependencies = [
  "parking_lot",
  "paths",
  "project",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -9199,6 +9264,16 @@ dependencies = [
  "util",
 ]
 
+[[package]]
+name = "jsonrpcmsg"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6d833a15225c779251e13929203518c2ff26e2fe0f322d584b213f4f4dad37bd"
+dependencies = [
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "jsonschema"
 version = "0.37.4"
@@ -9451,7 +9526,7 @@ dependencies = [
  "lsp",
  "parking_lot",
  "regex",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "toml 0.8.23",
@@ -9514,7 +9589,7 @@ dependencies = [
  "gpui_shared_string",
  "http_client",
  "partial-json-fixer",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "smol",
@@ -9566,7 +9641,7 @@ dependencies = [
  "opencode",
  "pretty_assertions",
  "release_channel",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -9593,7 +9668,7 @@ dependencies = [
  "http_client",
  "language_model",
  "open_ai",
- "schemars",
+ "schemars 1.0.4",
  "semver",
  "serde",
  "serde_json",
@@ -10112,7 +10187,7 @@ dependencies = [
  "anyhow",
  "futures 0.3.32",
  "http_client",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
 ]
@@ -10189,7 +10264,7 @@ dependencies = [
  "parking_lot",
  "postage",
  "release_channel",
- "schemars",
+ "schemars 1.0.4",
  "semver",
  "serde",
  "serde_json",
@@ -10771,7 +10846,7 @@ dependencies = [
  "anyhow",
  "futures 0.3.32",
  "http_client",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "strum 0.27.2",
@@ -10868,7 +10943,7 @@ dependencies = [
  "half",
  "hashbrown 0.16.1",
  "hexf-parse",
- "indexmap",
+ "indexmap 2.11.4",
  "libm",
  "log",
  "num-traits",
@@ -10892,7 +10967,7 @@ dependencies = [
  "half",
  "hashbrown 0.16.1",
  "hexf-parse",
- "indexmap",
+ "indexmap 2.11.4",
  "libm",
  "log",
  "num-traits",
@@ -11590,7 +11665,7 @@ checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe"
 dependencies = [
  "crc32fast",
  "hashbrown 0.15.5",
- "indexmap",
+ "indexmap 2.11.4",
  "memchr",
 ]
 
@@ -11643,7 +11718,7 @@ dependencies = [
  "anyhow",
  "futures 0.3.32",
  "http_client",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -11667,7 +11742,7 @@ dependencies = [
  "notifications",
  "picker",
  "project",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "settings",
  "telemetry",
@@ -11757,7 +11832,7 @@ dependencies = [
  "log",
  "pretty_assertions",
  "rand 0.9.3",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "strum 0.27.2",
@@ -11775,7 +11850,7 @@ dependencies = [
  "gpui",
  "picker",
  "project",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -11794,7 +11869,7 @@ dependencies = [
  "futures 0.3.32",
  "http_client",
  "language_model_core",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -11810,7 +11885,7 @@ dependencies = [
  "futures 0.3.32",
  "google_ai",
  "http_client",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "strum 0.27.2",
@@ -12127,6 +12202,12 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "35fb2e5f958ec131621fdd531e9fc186ed768cbe395337403ae56c17a74c68ec"
 
+[[package]]
+name = "pastey"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b867cad97c0791bbd3aaa6472142568c6c9e8f71937e98379f584cfb0cf35bec"
+
 [[package]]
 name = "pathdiff"
 version = "0.2.3"
@@ -12747,7 +12828,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db"
 dependencies = [
  "fixedbitset 0.4.2",
- "indexmap",
+ "indexmap 2.11.4",
 ]
 
 [[package]]
@@ -12861,7 +12942,7 @@ dependencies = [
  "editor",
  "gpui",
  "menu",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "settings",
  "theme",
@@ -12987,7 +13068,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "740ebea15c5d1428f910cd1a5f52cebf8d25006245ed8ade92702f4943d91e07"
 dependencies = [
  "base64 0.22.1",
- "indexmap",
+ "indexmap 2.11.4",
  "quick-xml 0.38.3",
  "serde",
  "time",
@@ -13369,7 +13450,7 @@ dependencies = [
  "gpui",
  "http_client",
  "image",
- "indexmap",
+ "indexmap 2.11.4",
  "itertools 0.14.0",
  "language",
  "log",
@@ -13388,7 +13469,7 @@ dependencies = [
  "release_channel",
  "remote",
  "rpc",
- "schemars",
+ "schemars 1.0.4",
  "semver",
  "serde",
  "serde_json",
@@ -13467,7 +13548,7 @@ dependencies = [
  "project",
  "rayon",
  "remote_connection",
- "schemars",
+ "schemars 1.0.4",
  "search",
  "serde",
  "serde_json",
@@ -14468,7 +14549,7 @@ dependencies = [
  "prost 0.9.0",
  "release_channel",
  "rpc",
- "schemars",
+ "schemars 1.0.4",
  "semver",
  "serde",
  "serde_json",
@@ -14824,6 +14905,41 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "rmcp"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2231b2c085b371c01bc90c0e6c1cab8834711b6394533375bdbf870b0166d419"
+dependencies = [
+ "async-trait",
+ "base64 0.22.1",
+ "chrono",
+ "futures 0.3.32",
+ "pastey 0.2.1",
+ "pin-project-lite",
+ "rmcp-macros",
+ "schemars 1.0.4",
+ "serde",
+ "serde_json",
+ "thiserror 2.0.17",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
+[[package]]
+name = "rmcp-macros"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "36ea0e100fadf81be85d7ff70f86cd805c7572601d4ab2946207f36540854b43"
+dependencies = [
+ "darling 0.23.0",
+ "proc-macro2",
+ "quote",
+ "serde_json",
+ "syn 2.0.117",
+]
+
 [[package]]
 name = "rmp"
 version = "0.8.14"
@@ -15385,7 +15501,7 @@ dependencies = [
  "anyhow",
  "clap",
  "env_logger 0.11.8",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -15393,14 +15509,27 @@ dependencies = [
  "theme_settings",
 ]
 
+[[package]]
+name = "schemars"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f"
+dependencies = [
+ "dyn-clone",
+ "ref-cast",
+ "serde",
+ "serde_json",
+]
+
 [[package]]
 name = "schemars"
 version = "1.0.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "82d20c4491bc164fa2f6c5d44565947a52ad80b9505d8e36f8d54c27c739fcd0"
 dependencies = [
+ "chrono",
  "dyn-clone",
- "indexmap",
+ "indexmap 2.11.4",
  "ref-cast",
  "schemars_derive",
  "serde",
@@ -15772,7 +15901,7 @@ version = "1.0.145"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "itoa",
  "memchr",
  "ryu",
@@ -15786,7 +15915,7 @@ version = "0.2.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0e033097bf0d2b59a62b42c18ebbb797503839b26afdda2c4e1415cb6c813540"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "itoa",
  "memchr",
  "ryu",
@@ -15845,13 +15974,44 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "serde_with"
+version = "3.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd5414fad8e6907dbdd5bc441a50ae8d6e26151a03b1de04d89a5576de61d01f"
+dependencies = [
+ "base64 0.22.1",
+ "chrono",
+ "hex",
+ "indexmap 1.9.3",
+ "indexmap 2.11.4",
+ "schemars 0.9.0",
+ "schemars 1.0.4",
+ "serde_core",
+ "serde_json",
+ "serde_with_macros",
+ "time",
+]
+
+[[package]]
+name = "serde_with_macros"
+version = "3.18.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65"
+dependencies = [
+ "darling 0.23.0",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.117",
+]
+
 [[package]]
 name = "serde_yaml"
 version = "0.9.34+deprecated"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "itoa",
  "ryu",
  "serde",
@@ -15898,7 +16058,7 @@ dependencies = [
  "pretty_assertions",
  "release_channel",
  "rust-embed",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -15921,7 +16081,7 @@ dependencies = [
  "gpui",
  "language_model_core",
  "log",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -16009,7 +16169,7 @@ dependencies = [
  "regex",
  "release_channel",
  "rodio",
- "schemars",
+ "schemars 1.0.4",
  "search",
  "serde",
  "serde_json",
@@ -16399,7 +16559,7 @@ dependencies = [
  "indoc",
  "parking_lot",
  "paths",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -16570,7 +16730,7 @@ dependencies = [
  "futures-util",
  "hashbrown 0.15.5",
  "hashlink 0.10.0",
- "indexmap",
+ "indexmap 2.11.4",
  "log",
  "memchr",
  "once_cell",
@@ -17428,7 +17588,7 @@ dependencies = [
  "menu",
  "picker",
  "project",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "settings",
@@ -17501,7 +17661,7 @@ dependencies = [
  "parking_lot",
  "pretty_assertions",
  "proto",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -17605,7 +17765,7 @@ dependencies = [
  "rand 0.9.3",
  "regex",
  "release_channel",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "settings",
  "smol",
@@ -17654,7 +17814,7 @@ dependencies = [
  "release_channel",
  "remote",
  "rpc",
- "schemars",
+ "schemars 1.0.4",
  "semver",
  "serde",
  "serde_json",
@@ -17701,7 +17861,7 @@ dependencies = [
  "palette",
  "parking_lot",
  "refineable",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -17731,7 +17891,7 @@ dependencies = [
  "clap",
  "collections",
  "gpui",
- "indexmap",
+ "indexmap 2.11.4",
  "log",
  "palette",
  "serde",
@@ -17778,7 +17938,7 @@ dependencies = [
  "log",
  "palette",
  "refineable",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -18001,7 +18161,7 @@ dependencies = [
  "remote",
  "remote_connection",
  "rpc",
- "schemars",
+ "schemars 1.0.4",
  "semver",
  "serde",
  "settings",
@@ -18189,7 +18349,7 @@ version = "0.9.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "serde_core",
  "serde_spanned 1.0.3",
  "toml_datetime 0.7.3",
@@ -18222,7 +18382,7 @@ version = "0.22.27"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "serde",
  "serde_spanned 0.6.9",
  "toml_datetime 0.6.11",
@@ -18236,7 +18396,7 @@ version = "0.23.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d"
 dependencies = [
- "indexmap",
+ "indexmap 2.11.4",
  "toml_datetime 0.7.3",
  "toml_parser",
  "winnow",
@@ -18919,7 +19079,7 @@ dependencies = [
  "icons",
  "itertools 0.14.0",
  "menu",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "smallvec",
  "strum 0.27.2",
@@ -19181,7 +19341,7 @@ dependencies = [
  "rand 0.9.3",
  "regex",
  "rust-embed",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "serde_json",
  "serde_json_lenient",
@@ -19293,7 +19453,7 @@ name = "vercel"
 version = "0.1.0"
 dependencies = [
  "anyhow",
- "schemars",
+ "schemars 1.0.4",
  "serde",
  "strum 0.27.2",
 ]
@@ -19344,7 +19504,7 @@ dependencies = [
  "project_panel",
  "regex",
  "release_channel",
- "schemars",
+ "schemars 1.0.4",
  "search",
  "semver",
  "serde",
@@ -19628,7 +19788,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0fd83062c17b9f4985d438603cde0a5e8c5c8198201a6937f778b607924c7da2"
 dependencies = [
  "anyhow",
- "indexmap",
+ "indexmap 2.11.4",
  "serde",
  "serde_derive",
  "serde_json",
@@ -19646,7 +19806,7 @@ dependencies = [
  "anyhow",
  "auditable-serde",
  "flate2",
- "indexmap",
+ "indexmap 2.11.4",
  "serde",
  "serde_derive",
  "serde_json",

Cargo.toml 🔗

@@ -496,7 +496,7 @@ ztracing_macro = { path = "crates/ztracing_macro" }
 # External crates
 #
 
-agent-client-protocol = { version = "=0.10.2", features = ["unstable"] }
+agent-client-protocol = { version = "=0.11.1", features = ["unstable"] }
 aho-corasick = "1.1"
 alacritty_terminal = { git = "https://github.com/zed-industries/alacritty", rev = "9d9640d4" }
 any_vec = "0.14"

crates/acp_thread/src/acp_thread.rs 🔗

@@ -3,7 +3,7 @@ mod diff;
 mod mention;
 mod terminal;
 use action_log::{ActionLog, ActionLogTelemetry};
-use agent_client_protocol::{self as acp};
+use agent_client_protocol::schema as acp;
 use anyhow::{Context as _, Result, anyhow};
 use collections::HashSet;
 pub use connection::*;

crates/acp_thread/src/connection.rs 🔗

@@ -1,5 +1,5 @@
 use crate::AcpThread;
-use agent_client_protocol::{self as acp};
+use agent_client_protocol::schema as acp;
 use anyhow::Result;
 use chrono::{DateTime, Utc};
 use collections::{HashMap, IndexMap};
@@ -954,7 +954,7 @@ mod test_support {
 
         fn truncate(
             &self,
-            _session_id: &agent_client_protocol::SessionId,
+            _session_id: &acp::SessionId,
             _cx: &App,
         ) -> Option<Rc<dyn AgentSessionTruncate>> {
             Some(Rc::new(StubAgentSessionEditor))

crates/acp_thread/src/mention.rs 🔗

@@ -1,4 +1,4 @@
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::{Context as _, Result, bail};
 use file_icons::FileIcons;
 use prompt_store::{PromptId, UserPromptId};

crates/acp_thread/src/terminal.rs 🔗

@@ -1,4 +1,4 @@
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::Result;
 use futures::{FutureExt as _, future::Shared};
 use gpui::{App, AppContext, AsyncApp, Context, Entity, Task};

crates/acp_tools/Cargo.toml 🔗

@@ -13,15 +13,20 @@ workspace = true
 path = "src/acp_tools.rs"
 doctest = false
 
+[features]
+test-support = ["workspace/test-support"]
+
 [dependencies]
 agent-client-protocol.workspace = true
 collections.workspace = true
 gpui.workspace = true
 language.workspace= true
+log.workspace = true
 markdown.workspace = true
 project.workspace = true
 serde.workspace = true
 serde_json.workspace = true
+smol.workspace = true
 settings.workspace = true
 theme_settings.workspace = true
 ui.workspace = true

crates/acp_tools/src/acp_tools.rs 🔗

@@ -1,12 +1,13 @@
 use std::{
-    cell::RefCell,
-    collections::HashSet,
+    collections::{HashSet, VecDeque},
     fmt::Display,
-    rc::{Rc, Weak},
-    sync::Arc,
+    sync::{
+        Arc,
+        atomic::{AtomicBool, Ordering},
+    },
 };
 
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use collections::HashMap;
 use gpui::{
     App, Empty, Entity, EventEmitter, FocusHandle, Focusable, Global, ListAlignment, ListState,
@@ -23,6 +24,111 @@ use workspace::{
     Item, ItemHandle, ToolbarItemEvent, ToolbarItemLocation, ToolbarItemView, Workspace,
 };
 
+#[derive(Clone, Copy, PartialEq, Eq)]
+pub enum StreamMessageDirection {
+    Incoming,
+    Outgoing,
+    /// Lines captured from the agent's stderr. These are not part of the
+    /// JSON-RPC protocol, but agents often emit useful diagnostics there.
+    Stderr,
+}
+
+#[derive(Clone)]
+pub enum StreamMessageContent {
+    Request {
+        id: acp::RequestId,
+        method: Arc<str>,
+        params: Option<serde_json::Value>,
+    },
+    Response {
+        id: acp::RequestId,
+        result: Result<Option<serde_json::Value>, acp::Error>,
+    },
+    Notification {
+        method: Arc<str>,
+        params: Option<serde_json::Value>,
+    },
+    /// A raw stderr line from the agent process.
+    Stderr { line: Arc<str> },
+}
+
+#[derive(Clone)]
+pub struct StreamMessage {
+    pub direction: StreamMessageDirection,
+    pub message: StreamMessageContent,
+}
+
+impl StreamMessage {
+    /// Build a `StreamMessage` from a raw line captured off the transport.
+    ///
+    /// For `Stderr`, the line is wrapped as-is (no JSON parsing). For
+    /// `Incoming`/`Outgoing`, the line is parsed as JSON-RPC; returns `None`
+    /// if it doesn't look like a valid JSON-RPC message.
+    pub fn from_raw_line(direction: StreamMessageDirection, line: &str) -> Option<Self> {
+        if direction == StreamMessageDirection::Stderr {
+            return Some(StreamMessage {
+                direction,
+                message: StreamMessageContent::Stderr {
+                    line: Arc::from(line),
+                },
+            });
+        }
+
+        let value: serde_json::Value = serde_json::from_str(line).ok()?;
+        let obj = value.as_object()?;
+
+        let parsed_id = obj
+            .get("id")
+            .map(|raw| serde_json::from_value::<acp::RequestId>(raw.clone()));
+
+        let message = if let Some(method) = obj.get("method").and_then(|m| m.as_str()) {
+            match parsed_id {
+                Some(Ok(id)) => StreamMessageContent::Request {
+                    id,
+                    method: method.into(),
+                    params: obj.get("params").cloned(),
+                },
+                Some(Err(err)) => {
+                    log::warn!("Skipping JSON-RPC message with unparsable id: {err}");
+                    return None;
+                }
+                None => StreamMessageContent::Notification {
+                    method: method.into(),
+                    params: obj.get("params").cloned(),
+                },
+            }
+        } else if let Some(parsed_id) = parsed_id {
+            let id = match parsed_id {
+                Ok(id) => id,
+                Err(err) => {
+                    log::warn!("Skipping JSON-RPC response with unparsable id: {err}");
+                    return None;
+                }
+            };
+            if let Some(error) = obj.get("error") {
+                let acp_err =
+                    serde_json::from_value::<acp::Error>(error.clone()).unwrap_or_else(|err| {
+                        log::warn!("Failed to deserialize ACP error: {err}");
+                        acp::Error::internal_error().data(error.to_string())
+                    });
+                StreamMessageContent::Response {
+                    id,
+                    result: Err(acp_err),
+                }
+            } else {
+                StreamMessageContent::Response {
+                    id,
+                    result: Ok(obj.get("result").cloned()),
+                }
+            }
+        } else {
+            return None;
+        };
+
+        Some(StreamMessage { direction, message })
+    }
+}
+
 actions!(dev, [OpenAcpLogs]);
 
 pub fn init(cx: &mut App) {
@@ -42,14 +148,87 @@ struct GlobalAcpConnectionRegistry(Entity<AcpConnectionRegistry>);
 
 impl Global for GlobalAcpConnectionRegistry {}
 
-#[derive(Default)]
-pub struct AcpConnectionRegistry {
-    active_connection: RefCell<Option<ActiveConnection>>,
+/// A raw line captured from the transport (or from stderr), tagged with
+/// direction. Deserialization into [`StreamMessage`] happens on the
+/// registry's foreground task so the ring buffer can be replayed to late
+/// subscribers.
+struct RawStreamLine {
+    direction: StreamMessageDirection,
+    line: Arc<str>,
 }
 
-struct ActiveConnection {
-    agent_id: AgentId,
-    connection: Weak<acp::ClientSideConnection>,
+/// Handle to an ACP connection's log tap. Passed back by
+/// [`AcpConnectionRegistry::set_active_connection`] so that the connection
+/// can publish transport and stderr lines without knowing anything about
+/// the logs panel's channel.
+///
+/// The tap carries a shared `enabled` flag that the registry flips on when
+/// the first observer subscribes. Until then, `emit_*` methods are
+/// effectively free: they check an atomic and return. This keeps the
+/// logs panel's memory footprint opt-in — if no one ever opens it, the
+/// transport never allocates a line or pushes a channel item.
+#[derive(Clone)]
+pub struct AcpLogTap {
+    enabled: Arc<AtomicBool>,
+    sender: smol::channel::Sender<RawStreamLine>,
+}
+
+impl AcpLogTap {
+    fn is_enabled(&self) -> bool {
+        self.enabled.load(Ordering::Relaxed)
+    }
+
+    fn enable(&self) {
+        self.enabled.store(true, Ordering::Relaxed);
+    }
+
+    fn emit(&self, direction: StreamMessageDirection, line: &str) {
+        if !self.is_enabled() {
+            return;
+        }
+        self.sender
+            .try_send(RawStreamLine {
+                direction,
+                line: Arc::from(line),
+            })
+            .log_err();
+    }
+
+    /// Record a line read from the agent's stdout.
+    pub fn emit_incoming(&self, line: &str) {
+        self.emit(StreamMessageDirection::Incoming, line);
+    }
+
+    /// Record a line written to the agent's stdin.
+    pub fn emit_outgoing(&self, line: &str) {
+        self.emit(StreamMessageDirection::Outgoing, line);
+    }
+
+    /// Record a line read from the agent's stderr.
+    pub fn emit_stderr(&self, line: &str) {
+        self.emit(StreamMessageDirection::Stderr, line);
+    }
+}
+
+/// Maximum number of messages retained in the registry's backlog.
+///
+/// Mirrors `MAX_STORED_LOG_ENTRIES` in the LSP log store, so that opening the
+/// ACP logs panel after a session has been running for a while still shows
+/// meaningful history.
+const MAX_BACKLOG_MESSAGES: usize = 2000;
+
+#[derive(Default)]
+pub struct AcpConnectionRegistry {
+    active_agent_id: Option<AgentId>,
+    generation: u64,
+    /// Bounded ring buffer of every message observed on the current connection.
+    /// When a new connection is set, this is cleared.
+    backlog: VecDeque<StreamMessage>,
+    subscribers: Vec<smol::channel::Sender<StreamMessage>>,
+    /// The tap handed to the currently active connection, so the registry
+    /// can flip its `enabled` flag the first time someone subscribes.
+    active_tap: Option<AcpLogTap>,
+    _broadcast_task: Option<Task<()>>,
 }
 
 impl AcpConnectionRegistry {
@@ -63,17 +242,94 @@ impl AcpConnectionRegistry {
         }
     }
 
+    /// Register a new active connection and return an [`AcpLogTap`] that
+    /// the connection should hand to its transport + stderr readers.
+    ///
+    /// The tap starts out disabled: transport lines are dropped cheaply
+    /// until someone subscribes via [`Self::subscribe`], at which point
+    /// the tap is flipped on and subsequent lines are broadcast to all
+    /// current and future subscribers.
     pub fn set_active_connection(
-        &self,
+        &mut self,
         agent_id: AgentId,
-        connection: &Rc<acp::ClientSideConnection>,
         cx: &mut Context<Self>,
-    ) {
-        self.active_connection.replace(Some(ActiveConnection {
-            agent_id,
-            connection: Rc::downgrade(connection),
+    ) -> AcpLogTap {
+        let (sender, raw_rx) = smol::channel::unbounded::<RawStreamLine>();
+        let tap = AcpLogTap {
+            enabled: Arc::new(AtomicBool::new(false)),
+            sender,
+        };
+
+        self.active_agent_id = Some(agent_id);
+        self.generation += 1;
+        self.backlog.clear();
+        self.subscribers.clear();
+        self.active_tap = Some(tap.clone());
+
+        self._broadcast_task = Some(cx.spawn(async move |this, cx| {
+            while let Ok(raw) = raw_rx.recv().await {
+                this.update(cx, |this, _cx| {
+                    let Some(message) = StreamMessage::from_raw_line(raw.direction, &raw.line)
+                    else {
+                        return;
+                    };
+
+                    if this.backlog.len() == MAX_BACKLOG_MESSAGES {
+                        this.backlog.pop_front();
+                    }
+                    this.backlog.push_back(message.clone());
+
+                    this.subscribers.retain(|sender| !sender.is_closed());
+                    for sender in &this.subscribers {
+                        sender.try_send(message.clone()).log_err();
+                    }
+                })
+                .log_err();
+            }
+
+            // The transport closed — clear state so observers (e.g. the ACP
+            // logs tab) can transition back to the disconnected state.
+            this.update(cx, |this, cx| {
+                this.active_agent_id = None;
+                this.subscribers.clear();
+                this.active_tap = None;
+                cx.notify();
+            })
+            .log_err();
         }));
+
         cx.notify();
+        tap
+    }
+
+    /// Clear the retained message history for the current connection and force
+    /// watchers to resubscribe so their local correlation state is reset too.
+    pub fn clear_messages(&mut self, cx: &mut Context<Self>) {
+        self.backlog.clear();
+        self.generation += 1;
+        self.subscribers.clear();
+        cx.notify();
+    }
+
+    /// Subscribe to messages on the current connection.
+    ///
+    /// Returns the existing backlog (already-observed messages) together with
+    /// a receiver for new messages. The caller is responsible for flushing the
+    /// backlog into its local state before draining the receiver, so that no
+    /// messages are dropped between the snapshot and live subscription.
+    ///
+    /// The first subscription enables the connection's log tap; prior
+    /// messages are therefore not available. This is intentional: the tap
+    /// is opt-in so that the default case (no one ever opens the ACP logs
+    /// panel) performs zero per-message bookkeeping.
+    pub fn subscribe(&mut self) -> (Vec<StreamMessage>, smol::channel::Receiver<StreamMessage>) {
+        if let Some(tap) = &self.active_tap {
+            tap.enable();
+        }
+        let backlog = self.backlog.iter().cloned().collect();
+        let (sender, receiver) = smol::channel::unbounded();
+        self.subscribers.push(sender);
+        (backlog, receiver)
     }
 }
 
@@ -88,9 +344,9 @@ struct AcpTools {
 
 struct WatchedConnection {
     agent_id: AgentId,
+    generation: u64,
     messages: Vec<WatchedConnectionMessage>,
     list_state: ListState,
-    connection: Weak<acp::ClientSideConnection>,
     incoming_request_methods: HashMap<acp::RequestId, Arc<str>>,
     outgoing_request_methods: HashMap<acp::RequestId, Arc<str>>,
     _task: Task<()>,
@@ -118,44 +374,54 @@ impl AcpTools {
     }
 
     fn update_connection(&mut self, cx: &mut Context<Self>) {
-        let active_connection = self.connection_registry.read(cx).active_connection.borrow();
-        let Some(active_connection) = active_connection.as_ref() else {
+        let (generation, agent_id) = {
+            let registry = self.connection_registry.read(cx);
+            (registry.generation, registry.active_agent_id.clone())
+        };
+
+        let Some(agent_id) = agent_id else {
+            self.watched_connection = None;
+            self.expanded.clear();
             return;
         };
 
-        if let Some(watched_connection) = self.watched_connection.as_ref() {
-            if Weak::ptr_eq(
-                &watched_connection.connection,
-                &active_connection.connection,
-            ) {
+        if let Some(watched) = self.watched_connection.as_ref() {
+            if watched.generation == generation {
                 return;
             }
         }
 
-        if let Some(connection) = active_connection.connection.upgrade() {
-            let mut receiver = connection.subscribe();
-            let task = cx.spawn(async move |this, cx| {
-                while let Ok(message) = receiver.recv().await {
-                    this.update(cx, |this, cx| {
-                        this.push_stream_message(message, cx);
-                    })
-                    .ok();
-                }
-            });
+        self.expanded.clear();
 
-            self.watched_connection = Some(WatchedConnection {
-                agent_id: active_connection.agent_id.clone(),
-                messages: vec![],
-                list_state: ListState::new(0, ListAlignment::Bottom, px(2048.)),
-                connection: active_connection.connection.clone(),
-                incoming_request_methods: HashMap::default(),
-                outgoing_request_methods: HashMap::default(),
-                _task: task,
-            });
+        let (backlog, messages_rx) = self
+            .connection_registry
+            .update(cx, |registry, _cx| registry.subscribe());
+
+        let task = cx.spawn(async move |this, cx| {
+            while let Ok(message) = messages_rx.recv().await {
+                this.update(cx, |this, cx| {
+                    this.push_stream_message(message, cx);
+                })
+                .log_err();
+            }
+        });
+
+        self.watched_connection = Some(WatchedConnection {
+            agent_id,
+            generation,
+            messages: vec![],
+            list_state: ListState::new(0, ListAlignment::Bottom, px(2048.)),
+            incoming_request_methods: HashMap::default(),
+            outgoing_request_methods: HashMap::default(),
+            _task: task,
+        });
+
+        for message in backlog {
+            self.push_stream_message(message, cx);
         }
     }
 
-    fn push_stream_message(&mut self, stream_message: acp::StreamMessage, cx: &mut Context<Self>) {
+    fn push_stream_message(&mut self, stream_message: StreamMessage, cx: &mut Context<Self>) {
         let Some(connection) = self.watched_connection.as_mut() else {
             return;
         };
@@ -163,27 +429,22 @@ impl AcpTools {
         let index = connection.messages.len();
 
         let (request_id, method, message_type, params) = match stream_message.message {
-            acp::StreamMessageContent::Request { id, method, params } => {
+            StreamMessageContent::Request { id, method, params } => {
                 let method_map = match stream_message.direction {
-                    acp::StreamMessageDirection::Incoming => {
-                        &mut connection.incoming_request_methods
-                    }
-                    acp::StreamMessageDirection::Outgoing => {
-                        &mut connection.outgoing_request_methods
-                    }
+                    StreamMessageDirection::Incoming => &mut connection.incoming_request_methods,
+                    StreamMessageDirection::Outgoing => &mut connection.outgoing_request_methods,
+                    // Stderr lines never carry request/response correlation.
+                    StreamMessageDirection::Stderr => return,
                 };
 
                 method_map.insert(id.clone(), method.clone());
                 (Some(id), method.into(), MessageType::Request, Ok(params))
             }
-            acp::StreamMessageContent::Response { id, result } => {
+            StreamMessageContent::Response { id, result } => {
                 let method_map = match stream_message.direction {
-                    acp::StreamMessageDirection::Incoming => {
-                        &mut connection.outgoing_request_methods
-                    }
-                    acp::StreamMessageDirection::Outgoing => {
-                        &mut connection.incoming_request_methods
-                    }
+                    StreamMessageDirection::Incoming => &mut connection.outgoing_request_methods,
+                    StreamMessageDirection::Outgoing => &mut connection.incoming_request_methods,
+                    StreamMessageDirection::Stderr => return,
                 };
 
                 if let Some(method) = method_map.remove(&id) {
@@ -197,9 +458,20 @@ impl AcpTools {
                     )
                 }
             }
-            acp::StreamMessageContent::Notification { method, params } => {
+            StreamMessageContent::Notification { method, params } => {
                 (None, method.into(), MessageType::Notification, Ok(params))
             }
+            StreamMessageContent::Stderr { line } => {
+                // Stderr is rendered as plain text inline with JSON-RPC traffic,
+                // using `stderr` as the pseudo-method name so it shows up in the
+                // header the same way real methods do.
+                (
+                    None,
+                    "stderr".into(),
+                    MessageType::Stderr,
+                    Ok(Some(serde_json::Value::String(line.to_string()))),
+                )
+            }
         };
 
         let message = WatchedConnectionMessage {
@@ -243,8 +515,9 @@ impl AcpTools {
                 };
                 Some(serde_json::json!({
                     "_direction": match message.direction {
-                        acp::StreamMessageDirection::Incoming => "incoming",
-                        acp::StreamMessageDirection::Outgoing => "outgoing",
+                        StreamMessageDirection::Incoming => "incoming",
+                        StreamMessageDirection::Outgoing => "outgoing",
+                        StreamMessageDirection::Stderr => "stderr",
                     },
                     "_type": message.message_type.to_string().to_lowercase(),
                     "id": message.request_id,
@@ -261,6 +534,8 @@ impl AcpTools {
         if let Some(connection) = self.watched_connection.as_mut() {
             connection.messages.clear();
             connection.list_state.reset(0);
+            connection.incoming_request_methods.clear();
+            connection.outgoing_request_methods.clear();
             self.expanded.clear();
             cx.notify();
         }
@@ -326,12 +601,15 @@ impl AcpTools {
                         cx.notify()
                     }))
                     .child(match message.direction {
-                        acp::StreamMessageDirection::Incoming => Icon::new(IconName::ArrowDown)
+                        StreamMessageDirection::Incoming => Icon::new(IconName::ArrowDown)
                             .color(Color::Error)
                             .size(IconSize::Small),
-                        acp::StreamMessageDirection::Outgoing => Icon::new(IconName::ArrowUp)
+                        StreamMessageDirection::Outgoing => Icon::new(IconName::ArrowUp)
                             .color(Color::Success)
                             .size(IconSize::Small),
+                        StreamMessageDirection::Stderr => Icon::new(IconName::Warning)
+                            .color(Color::Warning)
+                            .size(IconSize::Small),
                     })
                     .child(
                         Label::new(message.name.clone())
@@ -403,7 +681,7 @@ impl AcpTools {
 struct WatchedConnectionMessage {
     name: SharedString,
     request_id: Option<acp::RequestId>,
-    direction: acp::StreamMessageDirection,
+    direction: StreamMessageDirection,
     message_type: MessageType,
     params: Result<Option<serde_json::Value>, acp::Error>,
     collapsed_params_md: Option<Entity<Markdown>>,
@@ -463,6 +741,7 @@ enum MessageType {
     Request,
     Response,
     Notification,
+    Stderr,
 }
 
 impl Display for MessageType {
@@ -471,6 +750,7 @@ impl Display for MessageType {
             MessageType::Request => write!(f, "Request"),
             MessageType::Response => write!(f, "Response"),
             MessageType::Notification => write!(f, "Notification"),
+            MessageType::Stderr => write!(f, "Stderr"),
         }
     }
 }
@@ -561,6 +841,7 @@ impl Render for AcpToolsToolbarItemView {
         };
 
         let acp_tools = acp_tools.clone();
+        let connection_registry = acp_tools.read(cx).connection_registry.clone();
         let has_messages = acp_tools
             .read(cx)
             .watched_connection
@@ -585,6 +866,9 @@ impl Render for AcpToolsToolbarItemView {
                     .tooltip(Tooltip::text("Clear Messages"))
                     .disabled(!has_messages)
                     .on_click(cx.listener(move |_this, _, _window, cx| {
+                        connection_registry.update(cx, |registry, cx| {
+                            registry.clear_messages(cx);
+                        });
                         acp_tools.update(cx, |acp_tools, cx| {
                             acp_tools.clear_messages(cx);
                         });

crates/agent/src/agent.rs 🔗

@@ -28,7 +28,7 @@ use acp_thread::{
     AcpThread, AgentModelSelector, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
     AgentSessionListResponse, TokenUsageRatio, UserMessageId,
 };
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::{Context as _, Result, anyhow};
 use chrono::{DateTime, Utc};
 use collections::{HashMap, HashSet, IndexMap};

crates/agent/src/db.rs 🔗

@@ -1,6 +1,6 @@
 use crate::{AgentMessage, AgentMessageContent, UserMessage, UserMessageContent};
 use acp_thread::UserMessageId;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentProfileId;
 use anyhow::{Result, anyhow};
 use chrono::{DateTime, Utc};

crates/agent/src/native_agent_server.rs 🔗

@@ -1,6 +1,6 @@
 use std::{any::Any, rc::Rc, sync::Arc};
 
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_servers::{AgentServer, AgentServerDelegate};
 use agent_settings::{AgentSettings, language_model_to_selection};
 use anyhow::Result;

crates/agent/src/tests/mod.rs 🔗

@@ -3,7 +3,7 @@ use acp_thread::{
     AgentConnection, AgentModelGroupName, AgentModelList, PermissionOptions, ThreadStatus,
     UserMessageId,
 };
-use agent_client_protocol::{self as acp};
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentProfileId;
 use anyhow::Result;
 use client::{Client, RefreshLlmTokenListener, UserStore};
@@ -5402,7 +5402,7 @@ async fn test_max_subagent_depth_prevents_tool_registration(cx: &mut TestAppCont
             cx,
         );
         thread.set_subagent_context(SubagentContext {
-            parent_thread_id: agent_client_protocol::SessionId::new("parent-id"),
+            parent_thread_id: acp::SessionId::new("parent-id"),
             depth: MAX_SUBAGENT_DEPTH - 1,
         });
         thread

crates/agent/src/thread.rs 🔗

@@ -12,7 +12,7 @@ use feature_flags::{
     FeatureFlagAppExt as _, StreamingEditFileToolFeatureFlag, UpdatePlanToolFeatureFlag,
 };
 
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::{
     AgentProfileId, AgentSettings, SUMMARIZE_THREAD_DETAILED_PROMPT, SUMMARIZE_THREAD_PROMPT,
 };
@@ -3445,7 +3445,7 @@ where
         T::description()
     }
 
-    fn kind(&self) -> agent_client_protocol::ToolKind {
+    fn kind(&self) -> acp::ToolKind {
         T::kind()
     }
 

crates/agent/src/thread_store.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{DbThread, DbThreadMetadata, ThreadsDatabase};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::{Result, anyhow};
 use gpui::{App, Context, Entity, Global, Task, prelude::*};
 use util::path_list::PathList;

crates/agent/src/tools/context_server_registry.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{AgentToolOutput, AnyAgentTool, ToolCallEventStream, ToolInput};
-use agent_client_protocol::ToolKind;
+use agent_client_protocol::schema as acp;
 use anyhow::Result;
 use collections::{BTreeMap, HashMap};
 use context_server::{ContextServerId, client::NotificationSubscription};
@@ -304,8 +304,8 @@ impl AnyAgentTool for ContextServerTool {
         self.tool.description.clone().unwrap_or_default().into()
     }
 
-    fn kind(&self) -> ToolKind {
-        ToolKind::Other
+    fn kind(&self) -> acp::ToolKind {
+        acp::ToolKind::Other
     }
 
     fn initial_title(&self, _input: serde_json::Value, _cx: &mut App) -> SharedString {

crates/agent/src/tools/copy_path_tool.rs 🔗

@@ -5,7 +5,7 @@ use super::tool_permissions::{
 use crate::{
     AgentTool, ToolCallEventStream, ToolInput, ToolPermissionDecision, decide_permission_for_paths,
 };
-use agent_client_protocol::ToolKind;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use futures::FutureExt as _;
 use gpui::{App, Entity, Task};
@@ -61,8 +61,8 @@ impl AgentTool for CopyPathTool {
 
     const NAME: &'static str = "copy_path";
 
-    fn kind() -> ToolKind {
-        ToolKind::Move
+    fn kind() -> acp::ToolKind {
+        acp::ToolKind::Move
     }
 
     fn initial_title(
@@ -198,7 +198,6 @@ impl AgentTool for CopyPathTool {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use agent_client_protocol as acp;
     use fs::Fs as _;
     use gpui::TestAppContext;
     use project::{FakeFs, Project};

crates/agent/src/tools/create_directory_tool.rs 🔗

@@ -2,7 +2,7 @@ use super::tool_permissions::{
     SensitiveSettingsKind, authorize_symlink_access, canonicalize_worktree_roots,
     detect_symlink_escape, sensitive_settings_kind,
 };
-use agent_client_protocol::ToolKind;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use futures::FutureExt as _;
 use gpui::{App, Entity, SharedString, Task};
@@ -52,8 +52,8 @@ impl AgentTool for CreateDirectoryTool {
 
     const NAME: &'static str = "create_directory";
 
-    fn kind() -> ToolKind {
-        ToolKind::Read
+    fn kind() -> acp::ToolKind {
+        acp::ToolKind::Read
     }
 
     fn initial_title(
@@ -169,7 +169,6 @@ impl AgentTool for CreateDirectoryTool {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use agent_client_protocol as acp;
     use fs::Fs as _;
     use gpui::TestAppContext;
     use project::{FakeFs, Project};

crates/agent/src/tools/delete_path_tool.rs 🔗

@@ -6,7 +6,7 @@ use crate::{
     AgentTool, ToolCallEventStream, ToolInput, ToolPermissionDecision, decide_permission_for_path,
 };
 use action_log::ActionLog;
-use agent_client_protocol::ToolKind;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use futures::{FutureExt as _, SinkExt, StreamExt, channel::mpsc};
 use gpui::{App, AppContext, Entity, SharedString, Task};
@@ -55,8 +55,8 @@ impl AgentTool for DeletePathTool {
 
     const NAME: &'static str = "delete_path";
 
-    fn kind() -> ToolKind {
-        ToolKind::Delete
+    fn kind() -> acp::ToolKind {
+        acp::ToolKind::Delete
     }
 
     fn initial_title(
@@ -228,7 +228,6 @@ impl AgentTool for DeletePathTool {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use agent_client_protocol as acp;
     use fs::Fs as _;
     use gpui::TestAppContext;
     use project::{FakeFs, Project};

crates/agent/src/tools/diagnostics_tool.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{AgentTool, ToolCallEventStream, ToolInput};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::Result;
 use futures::FutureExt as _;
 use gpui::{App, Entity, Task};

crates/agent/src/tools/edit_file_tool.rs 🔗

@@ -6,7 +6,7 @@ use crate::{
     edit_agent::{EditAgent, EditAgentOutputEvent, EditFormat},
 };
 use acp_thread::Diff;
-use agent_client_protocol::{self as acp, ToolCallLocation, ToolCallUpdateFields};
+use agent_client_protocol::schema as acp;
 use anyhow::{Context as _, Result};
 use collections::HashSet;
 use futures::{FutureExt as _, StreamExt as _};
@@ -260,7 +260,7 @@ impl AgentTool for EditFileTool {
                     let abs_path = project.read(cx).absolute_path(&project_path, cx);
                     if let Some(abs_path) = abs_path.clone() {
                         event_stream.update_fields(
-                            ToolCallUpdateFields::new()
+                            acp::ToolCallUpdateFields::new()
                                 .locations(vec![acp::ToolCallLocation::new(abs_path)]),
                         );
                     }
@@ -409,7 +409,7 @@ impl AgentTool for EditFileTool {
                                     range.start.to_point(&buffer.snapshot()).row
                                 }));
                                 if let Some(abs_path) = abs_path.clone() {
-                                    event_stream.update_fields(ToolCallUpdateFields::new().locations(vec![ToolCallLocation::new(abs_path).line(line)]));
+                                    event_stream.update_fields(acp::ToolCallUpdateFields::new().locations(vec![acp::ToolCallLocation::new(abs_path).line(line)]));
                                 }
                                 emitted_location = true;
                             }

crates/agent/src/tools/fetch_tool.rs 🔗

@@ -2,7 +2,7 @@ use std::rc::Rc;
 use std::sync::Arc;
 use std::{borrow::Cow, cell::RefCell};
 
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use anyhow::{Context as _, Result, bail};
 use futures::{AsyncReadExt as _, FutureExt as _};

crates/agent/src/tools/find_path_tool.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{AgentTool, ToolCallEventStream, ToolInput};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::{Result, anyhow};
 use futures::FutureExt as _;
 use gpui::{App, AppContext, Entity, SharedString, Task};

crates/agent/src/tools/grep_tool.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{AgentTool, ToolCallEventStream, ToolInput};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::Result;
 use futures::{FutureExt as _, StreamExt};
 use gpui::{App, Entity, SharedString, Task};

crates/agent/src/tools/list_directory_tool.rs 🔗

@@ -3,7 +3,7 @@ use super::tool_permissions::{
     resolve_project_path,
 };
 use crate::{AgentTool, ToolCallEventStream, ToolInput};
-use agent_client_protocol::ToolKind;
+use agent_client_protocol::schema as acp;
 use anyhow::{Context as _, Result, anyhow};
 use gpui::{App, Entity, SharedString, Task};
 use project::{Project, ProjectPath, WorktreeSettings};
@@ -127,8 +127,8 @@ impl AgentTool for ListDirectoryTool {
 
     const NAME: &'static str = "list_directory";
 
-    fn kind() -> ToolKind {
-        ToolKind::Read
+    fn kind() -> acp::ToolKind {
+        acp::ToolKind::Read
     }
 
     fn initial_title(
@@ -267,7 +267,6 @@ impl AgentTool for ListDirectoryTool {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use agent_client_protocol as acp;
     use fs::Fs as _;
     use gpui::{TestAppContext, UpdateGlobal};
     use indoc::indoc;

crates/agent/src/tools/move_path_tool.rs 🔗

@@ -5,7 +5,7 @@ use super::tool_permissions::{
 use crate::{
     AgentTool, ToolCallEventStream, ToolInput, ToolPermissionDecision, decide_permission_for_paths,
 };
-use agent_client_protocol::ToolKind;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use futures::FutureExt as _;
 use gpui::{App, Entity, SharedString, Task};
@@ -62,8 +62,8 @@ impl AgentTool for MovePathTool {
 
     const NAME: &'static str = "move_path";
 
-    fn kind() -> ToolKind {
-        ToolKind::Move
+    fn kind() -> acp::ToolKind {
+        acp::ToolKind::Move
     }
 
     fn initial_title(
@@ -205,7 +205,6 @@ impl AgentTool for MovePathTool {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use agent_client_protocol as acp;
     use fs::Fs as _;
     use gpui::TestAppContext;
     use project::{FakeFs, Project};

crates/agent/src/tools/now_tool.rs 🔗

@@ -1,6 +1,6 @@
 use std::sync::Arc;
 
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use chrono::{Local, Utc};
 use gpui::{App, SharedString, Task};
 use schemars::JsonSchema;

crates/agent/src/tools/open_tool.rs 🔗

@@ -3,7 +3,7 @@ use super::tool_permissions::{
     resolve_project_path,
 };
 use crate::{AgentTool, ToolInput};
-use agent_client_protocol::ToolKind;
+use agent_client_protocol::schema as acp;
 use futures::FutureExt as _;
 use gpui::{App, AppContext as _, Entity, SharedString, Task};
 use project::Project;
@@ -43,8 +43,8 @@ impl AgentTool for OpenTool {
 
     const NAME: &'static str = "open";
 
-    fn kind() -> ToolKind {
-        ToolKind::Execute
+    fn kind() -> acp::ToolKind {
+        acp::ToolKind::Execute
     }
 
     fn initial_title(

crates/agent/src/tools/read_file_tool.rs 🔗

@@ -1,5 +1,5 @@
 use action_log::ActionLog;
-use agent_client_protocol::{self as acp, ToolCallUpdateFields};
+use agent_client_protocol::schema as acp;
 use anyhow::{Context as _, Result, anyhow};
 use futures::FutureExt as _;
 use gpui::{App, Entity, SharedString, Task};
@@ -200,7 +200,7 @@ impl AgentTool for ReadFileTool {
             let file_path = input.path.clone();
 
             cx.update(|_cx| {
-                event_stream.update_fields(ToolCallUpdateFields::new().locations(vec![
+                event_stream.update_fields(acp::ToolCallUpdateFields::new().locations(vec![
                     acp::ToolCallLocation::new(&abs_path)
                         .line(input.start_line.map(|line| line.saturating_sub(1))),
                 ]));
@@ -228,7 +228,7 @@ impl AgentTool for ReadFileTool {
                     .context("processing image")
                     .map_err(tool_content_err)?;
 
-                event_stream.update_fields(ToolCallUpdateFields::new().content(vec![
+                event_stream.update_fields(acp::ToolCallUpdateFields::new().content(vec![
                     acp::ToolCallContent::Content(acp::Content::new(acp::ContentBlock::Image(
                         acp::ImageContent::new(language_model_image.source.clone(), "image/png"),
                     ))),
@@ -333,7 +333,7 @@ impl AgentTool for ReadFileTool {
                         text,
                     }
                     .to_string();
-                    event_stream.update_fields(ToolCallUpdateFields::new().content(vec![
+                    event_stream.update_fields(acp::ToolCallUpdateFields::new().content(vec![
                         acp::ToolCallContent::Content(acp::Content::new(markdown)),
                     ]));
                 }
@@ -347,7 +347,6 @@ impl AgentTool for ReadFileTool {
 #[cfg(test)]
 mod test {
     use super::*;
-    use agent_client_protocol as acp;
     use fs::Fs as _;
     use gpui::{AppContext, TestAppContext, UpdateGlobal as _};
     use project::{FakeFs, Project};

crates/agent/src/tools/restore_file_from_disk_tool.rs 🔗

@@ -3,7 +3,7 @@ use super::tool_permissions::{
     canonicalize_worktree_roots, path_has_symlink_escape, resolve_project_path,
     sensitive_settings_kind,
 };
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use collections::FxHashSet;
 use futures::FutureExt as _;

crates/agent/src/tools/save_file_tool.rs 🔗

@@ -1,4 +1,4 @@
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use collections::FxHashSet;
 use futures::FutureExt as _;

crates/agent/src/tools/spawn_agent_tool.rs 🔗

@@ -1,5 +1,5 @@
 use acp_thread::{SUBAGENT_SESSION_INFO_META_KEY, SubagentSessionInfo};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::Result;
 use gpui::{App, SharedString, Task};
 use language_model::LanguageModelToolResultContent;

crates/agent/src/tools/streaming_edit_file_tool.rs 🔗

@@ -12,7 +12,7 @@ use crate::{
 };
 use acp_thread::Diff;
 use action_log::ActionLog;
-use agent_client_protocol::{self as acp, ToolCallLocation, ToolCallUpdateFields};
+use agent_client_protocol::schema::{self as acp, ToolCallLocation, ToolCallUpdateFields};
 use anyhow::Result;
 use collections::HashSet;
 use futures::FutureExt as _;

crates/agent/src/tools/terminal_tool.rs 🔗

@@ -1,4 +1,4 @@
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use anyhow::Result;
 use futures::FutureExt as _;

crates/agent/src/tools/update_plan_tool.rs 🔗

@@ -1,5 +1,5 @@
 use crate::{AgentTool, ToolCallEventStream, ToolInput};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use gpui::{App, SharedString, Task};
 use schemars::JsonSchema;
 use serde::{Deserialize, Serialize};

crates/agent/src/tools/web_search_tool.rs 🔗

@@ -4,7 +4,7 @@ use crate::{
     AgentTool, ToolCallEventStream, ToolInput, ToolPermissionDecision,
     decide_permission_from_settings,
 };
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use anyhow::Result;
 use cloud_llm_client::WebSearchResponse;

crates/agent_servers/Cargo.toml 🔗

@@ -6,7 +6,7 @@ publish.workspace = true
 license = "GPL-3.0-or-later"
 
 [features]
-test-support = ["acp_thread/test-support", "gpui/test-support", "project/test-support", "dep:async-pipe", "dep:env_logger", "client/test-support", "dep:gpui_tokio", "reqwest_client/test-support"]
+test-support = ["acp_tools/test-support", "acp_thread/test-support", "gpui/test-support", "project/test-support", "dep:env_logger", "client/test-support", "dep:gpui_tokio", "reqwest_client/test-support"]
 e2e = []
 
 [lints]
@@ -22,8 +22,6 @@ acp_thread.workspace = true
 action_log.workspace = true
 agent-client-protocol.workspace = true
 anyhow.workspace = true
-async-pipe = { workspace = true, optional = true }
-async-trait.workspace = true
 chrono.workspace = true
 client.workspace = true
 collections.workspace = true
@@ -67,7 +65,6 @@ fs.workspace = true
 
 indoc.workspace = true
 acp_thread = { workspace = true, features = ["test-support"] }
-async-pipe.workspace = true
 gpui = { workspace = true, features = ["test-support"] }
 gpui_tokio.workspace = true
 project = { workspace = true, features = ["test-support"] }

crates/agent_servers/src/acp.rs 🔗

@@ -4,14 +4,17 @@ use acp_thread::{
 };
 use acp_tools::AcpConnectionRegistry;
 use action_log::ActionLog;
-use agent_client_protocol::{self as acp, Agent as _, ErrorCode};
+use agent_client_protocol::schema::{self as acp, ErrorCode};
+use agent_client_protocol::{
+    Agent, Client, ConnectionTo, JsonRpcResponse, Lines, Responder, SentRequest,
+};
 use anyhow::anyhow;
 use collections::HashMap;
 use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
-use futures::AsyncBufReadExt as _;
-use futures::FutureExt as _;
+use futures::channel::mpsc;
 use futures::future::Shared;
 use futures::io::BufReader;
+use futures::{AsyncBufReadExt as _, Future, FutureExt as _, StreamExt as _};
 use project::agent_server_store::{AgentServerCommand, AgentServerStore};
 use project::{AgentId, Project};
 use remote::remote_client::Interactive;
@@ -19,6 +22,7 @@ use serde::Deserialize;
 use std::path::PathBuf;
 use std::process::Stdio;
 use std::rc::Rc;
+use std::sync::Arc;
 use std::{any::Any, cell::RefCell};
 use task::{Shell, ShellBuilder, SpawnInTerminal};
 use thiserror::Error;
@@ -26,8 +30,6 @@ use util::ResultExt as _;
 use util::path_list::PathList;
 use util::process::Child;
 
-use std::sync::Arc;
-
 use anyhow::{Context as _, Result};
 use gpui::{App, AppContext as _, AsyncApp, Entity, SharedString, Task, WeakEntity};
 
@@ -39,14 +41,195 @@ use crate::GEMINI_ID;
 
 pub const GEMINI_TERMINAL_AUTH_METHOD_ID: &str = "spawn-gemini-cli";
 
+/// Awaits the response to an ACP request from a GPUI foreground task.
+///
+/// The ACP SDK offers two ways to consume a [`SentRequest`]:
+///   - [`SentRequest::block_task`]: linear `.await` inside a spawned task.
+///   - [`SentRequest::on_receiving_result`]: a callback invoked when the
+///     response arrives, with the guarantee that no other inbound messages
+///     are processed while the callback runs. This is the recommended form
+///     inside SDK handler callbacks, where [`block_task`] would deadlock.
+///
+/// We use `on_receiving_result` with a oneshot bridge here (rather than
+/// [`block_task`]) so that our handler-side code paths can share a single
+/// request-awaiting helper. The SDK callback itself is trivial (one channel
+/// send) so the extra ordering guarantee it imposes on the dispatch loop is
+/// negligible.
+fn into_foreground_future<T: JsonRpcResponse>(
+    sent: SentRequest<T>,
+) -> impl Future<Output = Result<T, acp::Error>> {
+    let (tx, rx) = futures::channel::oneshot::channel();
+    let spawn_result = sent.on_receiving_result(async move |result| {
+        tx.send(result).ok();
+        Ok(())
+    });
+    async move {
+        spawn_result?;
+        rx.await.map_err(|_| {
+            acp::Error::internal_error()
+                .data("response channel cancelled — connection may have dropped")
+        })?
+    }
+}
+
 #[derive(Debug, Error)]
 #[error("Unsupported version")]
 pub struct UnsupportedVersion;
 
+/// Helper for flattening the nested `Result` shapes that come out of
+/// `entity.update(cx, |_, cx| fallible_op(cx))` into a single `Result<T,
+/// acp::Error>`.
+///
+/// `anyhow::Error` values get converted via `acp::Error::from`, which
+/// downcasts an `acp::Error` back out of `anyhow` when present, so typed
+/// errors like auth-required survive the trip.
+trait FlattenAcpResult<T> {
+    fn flatten_acp(self) -> Result<T, acp::Error>;
+}
+
+impl<T> FlattenAcpResult<T> for Result<Result<T, anyhow::Error>, anyhow::Error> {
+    fn flatten_acp(self) -> Result<T, acp::Error> {
+        match self {
+            Ok(Ok(value)) => Ok(value),
+            Ok(Err(err)) => Err(err.into()),
+            Err(err) => Err(err.into()),
+        }
+    }
+}
+
+impl<T> FlattenAcpResult<T> for Result<Result<T, acp::Error>, anyhow::Error> {
+    fn flatten_acp(self) -> Result<T, acp::Error> {
+        match self {
+            Ok(Ok(value)) => Ok(value),
+            Ok(Err(err)) => Err(err),
+            Err(err) => Err(err.into()),
+        }
+    }
+}
+
+/// Holds state needed by foreground work dispatched from background handler closures.
+struct ClientContext {
+    sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
+    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
+}
+
+fn dispatch_queue_closed_error() -> acp::Error {
+    acp::Error::internal_error().data("ACP foreground dispatch queue closed")
+}
+
+/// Work items sent from `Send` handler closures to the `!Send` foreground thread.
+trait ForegroundWorkItem: Send {
+    fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext);
+    fn reject(self: Box<Self>);
+}
+
+type ForegroundWork = Box<dyn ForegroundWorkItem>;
+
+struct RequestForegroundWork<Req, Res>
+where
+    Req: Send + 'static,
+    Res: JsonRpcResponse + Send + 'static,
+{
+    request: Req,
+    responder: Responder<Res>,
+    handler: fn(Req, Responder<Res>, &mut AsyncApp, &ClientContext),
+}
+
+impl<Req, Res> ForegroundWorkItem for RequestForegroundWork<Req, Res>
+where
+    Req: Send + 'static,
+    Res: JsonRpcResponse + Send + 'static,
+{
+    fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext) {
+        let Self {
+            request,
+            responder,
+            handler,
+        } = *self;
+        handler(request, responder, cx, ctx);
+    }
+
+    fn reject(self: Box<Self>) {
+        let Self { responder, .. } = *self;
+        log::error!("ACP foreground dispatch queue closed while handling inbound request");
+        responder
+            .respond_with_error(dispatch_queue_closed_error())
+            .log_err();
+    }
+}
+
+struct NotificationForegroundWork<Notif>
+where
+    Notif: Send + 'static,
+{
+    notification: Notif,
+    connection: ConnectionTo<Agent>,
+    handler: fn(Notif, &mut AsyncApp, &ClientContext),
+}
+
+impl<Notif> ForegroundWorkItem for NotificationForegroundWork<Notif>
+where
+    Notif: Send + 'static,
+{
+    fn run(self: Box<Self>, cx: &mut AsyncApp, ctx: &ClientContext) {
+        let Self {
+            notification,
+            handler,
+            ..
+        } = *self;
+        handler(notification, cx, ctx);
+    }
+
+    fn reject(self: Box<Self>) {
+        let Self { connection, .. } = *self;
+        log::error!("ACP foreground dispatch queue closed while handling inbound notification");
+        connection
+            .send_error_notification(dispatch_queue_closed_error())
+            .log_err();
+    }
+}
+
+fn enqueue_request<Req, Res>(
+    dispatch_tx: &mpsc::UnboundedSender<ForegroundWork>,
+    request: Req,
+    responder: Responder<Res>,
+    handler: fn(Req, Responder<Res>, &mut AsyncApp, &ClientContext),
+) where
+    Req: Send + 'static,
+    Res: JsonRpcResponse + Send + 'static,
+{
+    let work: ForegroundWork = Box::new(RequestForegroundWork {
+        request,
+        responder,
+        handler,
+    });
+    if let Err(err) = dispatch_tx.unbounded_send(work) {
+        err.into_inner().reject();
+    }
+}
+
+fn enqueue_notification<Notif>(
+    dispatch_tx: &mpsc::UnboundedSender<ForegroundWork>,
+    notification: Notif,
+    connection: ConnectionTo<Agent>,
+    handler: fn(Notif, &mut AsyncApp, &ClientContext),
+) where
+    Notif: Send + 'static,
+{
+    let work: ForegroundWork = Box::new(NotificationForegroundWork {
+        notification,
+        connection,
+        handler,
+    });
+    if let Err(err) = dispatch_tx.unbounded_send(work) {
+        err.into_inner().reject();
+    }
+}
+
 pub struct AcpConnection {
     id: AgentId,
     telemetry_id: SharedString,
-    connection: Rc<acp::ClientSideConnection>,
+    connection: ConnectionTo<Agent>,
     sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
     pending_sessions: Rc<RefCell<HashMap<acp::SessionId, PendingAcpSession>>>,
     auth_methods: Vec<acp::AuthMethod>,
@@ -57,7 +240,8 @@ pub struct AcpConnection {
     default_config_options: HashMap<String, String>,
     child: Option<Child>,
     session_list: Option<Rc<AcpSessionList>>,
-    _io_task: Task<Result<(), acp::Error>>,
+    _io_task: Task<()>,
+    _dispatch_task: Task<()>,
     _wait_task: Task<Result<()>>,
     _stderr_task: Task<Result<()>>,
 }
@@ -101,13 +285,13 @@ pub struct AcpSession {
 }
 
 pub struct AcpSessionList {
-    connection: Rc<acp::ClientSideConnection>,
+    connection: ConnectionTo<Agent>,
     updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
     updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
 }
 
 impl AcpSessionList {
-    fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
+    fn new(connection: ConnectionTo<Agent>) -> Self {
         let (tx, rx) = smol::channel::unbounded();
         Self {
             connection,
@@ -140,7 +324,9 @@ impl AgentSessionList for AcpSessionList {
             let acp_request = acp::ListSessionsRequest::new()
                 .cwd(request.cwd)
                 .cursor(request.cursor);
-            let response = conn.list_sessions(acp_request).await?;
+            let response = into_foreground_future(conn.send_request(acp_request))
+                .await
+                .map_err(map_acp_error)?;
             Ok(AgentSessionListResponse {
                 sessions: response
                     .sessions
@@ -206,6 +392,97 @@ pub async fn connect(
 
 const MINIMUM_SUPPORTED_VERSION: acp::ProtocolVersion = acp::ProtocolVersion::V1;
 
+/// Build a `Client` connection over `transport` with Zed's full
+/// agent→client handler set wired up.
+///
+/// All incoming requests and notifications are forwarded to the foreground
+/// dispatch queue via `dispatch_tx`, where they are handled by the
+/// `handle_*` functions on a GPUI context. The returned future drives the
+/// connection and completes when the transport closes; callers are expected
+/// to spawn it on a background executor and hold the task for the lifetime
+/// of the connection. The `connection_tx` oneshot receives the
+/// `ConnectionTo<Agent>` handle as soon as the builder runs its `main_fn`.
+fn connect_client_future(
+    name: &'static str,
+    transport: impl agent_client_protocol::ConnectTo<Client> + 'static,
+    dispatch_tx: mpsc::UnboundedSender<ForegroundWork>,
+    connection_tx: futures::channel::oneshot::Sender<ConnectionTo<Agent>>,
+) -> impl Future<Output = Result<(), acp::Error>> {
+    // Each handler forwards its inputs onto the foreground dispatch queue.
+    // The SDK requires the closure to be `Send`, so we move a clone of
+    // `dispatch_tx` into each one.
+    macro_rules! on_request {
+        ($handler:ident) => {{
+            let dispatch_tx = dispatch_tx.clone();
+            async move |req, responder, _connection| {
+                enqueue_request(&dispatch_tx, req, responder, $handler);
+                Ok(())
+            }
+        }};
+    }
+    macro_rules! on_notification {
+        ($handler:ident) => {{
+            let dispatch_tx = dispatch_tx.clone();
+            async move |notif, connection| {
+                enqueue_notification(&dispatch_tx, notif, connection, $handler);
+                Ok(())
+            }
+        }};
+    }
+
+    Client
+        .builder()
+        .name(name)
+        // --- Request handlers (agent→client) ---
+        .on_receive_request(
+            on_request!(handle_request_permission),
+            agent_client_protocol::on_receive_request!(),
+        )
+        .on_receive_request(
+            on_request!(handle_write_text_file),
+            agent_client_protocol::on_receive_request!(),
+        )
+        .on_receive_request(
+            on_request!(handle_read_text_file),
+            agent_client_protocol::on_receive_request!(),
+        )
+        .on_receive_request(
+            on_request!(handle_create_terminal),
+            agent_client_protocol::on_receive_request!(),
+        )
+        .on_receive_request(
+            on_request!(handle_kill_terminal),
+            agent_client_protocol::on_receive_request!(),
+        )
+        .on_receive_request(
+            on_request!(handle_release_terminal),
+            agent_client_protocol::on_receive_request!(),
+        )
+        .on_receive_request(
+            on_request!(handle_terminal_output),
+            agent_client_protocol::on_receive_request!(),
+        )
+        .on_receive_request(
+            on_request!(handle_wait_for_terminal_exit),
+            agent_client_protocol::on_receive_request!(),
+        )
+        // --- Notification handlers (agent→client) ---
+        .on_receive_notification(
+            on_notification!(handle_session_notification),
+            agent_client_protocol::on_receive_notification!(),
+        )
+        .connect_with(
+            transport,
+            move |connection: ConnectionTo<Agent>| async move {
+                if connection_tx.send(connection).is_err() {
+                    log::error!("failed to send ACP connection handle — receiver was dropped");
+                }
+                // Keep the connection alive until the transport closes.
+                futures::future::pending::<Result<(), acp::Error>>().await
+            },
+        )
+}
+
 impl AcpConnection {
     pub async fn stdio(
         agent_id: AgentId,
@@ -283,30 +560,93 @@ impl AcpConnection {
         let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
             Rc::new(RefCell::new(None));
 
-        let client = ClientDelegate {
+        // Set up the foreground dispatch channel for bridging Send handler
+        // closures to the !Send foreground thread.
+        let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
+
+        // Register this connection with the logs panel registry. The
+        // returned tap is opt-in: until someone subscribes to the ACP logs
+        // panel, `emit_*` calls below are ~free (atomic load + return).
+        let log_tap = cx.update(|cx| {
+            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
+                registry.set_active_connection(agent_id.clone(), cx)
+            })
+        });
+
+        let incoming_lines = futures::io::BufReader::new(stdout).lines();
+        let tapped_incoming = incoming_lines.inspect({
+            let log_tap = log_tap.clone();
+            move |result| match result {
+                Ok(line) => log_tap.emit_incoming(line),
+                Err(err) => {
+                    // I/O errors on the transport are fatal for the SDK, but
+                    // without logging them the ACP logs panel shows no trace
+                    // of why the connection died.
+                    log::warn!("ACP transport read error: {err}");
+                }
+            }
+        });
+
+        let tapped_outgoing = futures::sink::unfold(
+            (Box::pin(stdin), log_tap.clone()),
+            async move |(mut writer, log_tap), line: String| {
+                use futures::AsyncWriteExt;
+                log_tap.emit_outgoing(&line);
+                let mut bytes = line.into_bytes();
+                bytes.push(b'\n');
+                writer.write_all(&bytes).await?;
+                Ok::<_, std::io::Error>((writer, log_tap))
+            },
+        );
+
+        let transport = Lines::new(tapped_outgoing, tapped_incoming);
+
+        // `connect_client_future` installs the production handler set and
+        // hands us back both the connection-future (to run on a background
+        // executor) and a oneshot receiver that produces the
+        // `ConnectionTo<Agent>` once the transport handshake is ready.
+        let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
+        let connection_future =
+            connect_client_future("zed", transport, dispatch_tx.clone(), connection_tx);
+        let io_task = cx.background_spawn(async move {
+            if let Err(err) = connection_future.await {
+                log::error!("ACP connection error: {err}");
+            }
+        });
+
+        let connection: ConnectionTo<Agent> = connection_rx
+            .await
+            .context("Failed to receive ACP connection handle")?;
+
+        // Set up the foreground dispatch loop to process work items from handlers.
+        let dispatch_context = ClientContext {
             sessions: sessions.clone(),
             session_list: client_session_list.clone(),
-            cx: cx.clone(),
         };
-        let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
-            let foreground_executor = cx.foreground_executor().clone();
-            move |fut| {
-                foreground_executor.spawn(fut).detach();
+        let dispatch_task = cx.spawn({
+            let mut dispatch_rx = dispatch_rx;
+            async move |cx| {
+                while let Some(work) = dispatch_rx.next().await {
+                    work.run(cx, &dispatch_context);
+                }
             }
         });
 
-        let io_task = cx.background_spawn(io_task);
-
-        let stderr_task = cx.background_spawn(async move {
-            let mut stderr = BufReader::new(stderr);
-            let mut line = String::new();
-            while let Ok(n) = stderr.read_line(&mut line).await
-                && n > 0
-            {
-                log::warn!("agent stderr: {}", line.trim());
-                line.clear();
+        let stderr_task = cx.background_spawn({
+            let log_tap = log_tap.clone();
+            async move {
+                let mut stderr = BufReader::new(stderr);
+                let mut line = String::new();
+                while let Ok(n) = stderr.read_line(&mut line).await
+                    && n > 0
+                {
+                    let trimmed = line.trim_end_matches(['\n', '\r']);
+                    log::warn!("agent stderr: {trimmed}");
+                    log_tap.emit_stderr(trimmed);
+                    line.clear();
+                }
+                Ok(())
             }
-            Ok(())
         });
 
         let wait_task = cx.spawn({
@@ -319,16 +659,8 @@ impl AcpConnection {
             }
         });
 
-        let connection = Rc::new(connection);
-
-        cx.update(|cx| {
-            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
-                registry.set_active_connection(agent_id.clone(), &connection, cx)
-            });
-        });
-
-        let response = connection
-            .initialize(
+        let response = into_foreground_future(
+            connection.send_request(
                 acp::InitializeRequest::new(acp::ProtocolVersion::V1)
                     .client_capabilities(
                         acp::ClientCapabilities::new()
@@ -337,7 +669,6 @@ impl AcpConnection {
                                 .write_text_file(true))
                             .terminal(true)
                             .auth(acp::AuthCapabilities::new().terminal(true))
-                            // Experimental: Allow for rendering terminal output from the agents
                             .meta(acp::Meta::from_iter([
                                 ("terminal_output".into(), true.into()),
                                 ("terminal-auth".into(), true.into()),
@@ -347,8 +678,9 @@ impl AcpConnection {
                         acp::Implementation::new("zed", version)
                             .title(release_channel.map(ToOwned::to_owned)),
                     ),
-            )
-            .await?;
+            ),
+        )
+        .await?;
 
         if response.protocol_version < MINIMUM_SUPPORTED_VERSION {
             return Err(UnsupportedVersion.into());
@@ -407,6 +739,7 @@ impl AcpConnection {
             default_config_options,
             session_list,
             _io_task: io_task,
+            _dispatch_task: dispatch_task,
             _wait_task: wait_task,
             _stderr_task: stderr_task,
             child: Some(child),
@@ -419,11 +752,12 @@ impl AcpConnection {
 
     #[cfg(any(test, feature = "test-support"))]
     fn new_for_test(
-        connection: Rc<acp::ClientSideConnection>,
+        connection: ConnectionTo<Agent>,
         sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
         agent_capabilities: acp::AgentCapabilities,
         agent_server_store: WeakEntity<AgentServerStore>,
-        io_task: Task<Result<(), acp::Error>>,
+        io_task: Task<()>,
+        dispatch_task: Task<()>,
         _cx: &mut App,
     ) -> Self {
         Self {
@@ -441,6 +775,7 @@ impl AcpConnection {
             child: None,
             session_list: None,
             _io_task: io_task,
+            _dispatch_task: dispatch_task,
             _wait_task: Task::ready(Ok(())),
             _stderr_task: Task::ready(Ok(())),
         }
@@ -453,7 +788,7 @@ impl AcpConnection {
         work_dirs: PathList,
         title: Option<SharedString>,
         rpc_call: impl FnOnce(
-            Rc<acp::ClientSideConnection>,
+            ConnectionTo<Agent>,
             acp::SessionId,
             PathBuf,
         )
@@ -647,14 +982,15 @@ impl AcpConnection {
                 let config_opts = config_options.clone();
                 let conn = self.connection.clone();
                 async move |_| {
-                    let result = conn
-                        .set_session_config_option(acp::SetSessionConfigOptionRequest::new(
+                    let result = into_foreground_future(conn.send_request(
+                        acp::SetSessionConfigOptionRequest::new(
                             session_id,
                             config_id_clone.clone(),
                             default_value_id,
-                        ))
-                        .await
-                        .log_err();
+                        ),
+                    ))
+                    .await
+                    .log_err();
 
                     if result.is_none() {
                         if let Some(initial) = initial_value {
@@ -781,17 +1117,23 @@ impl AgentConnection for AcpConnection {
         let mcp_servers = mcp_servers_for_project(&project, cx);
 
         cx.spawn(async move |cx| {
-            let response = self.connection
-                .new_session(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers))
-                .await
-                .map_err(map_acp_error)?;
+            let response = into_foreground_future(
+                self.connection
+                    .send_request(acp::NewSessionRequest::new(cwd.clone()).mcp_servers(mcp_servers)),
+            )
+            .await
+            .map_err(map_acp_error)?;
 
-            let (modes, models, config_options) = config_state(response.modes, response.models, response.config_options);
+            let (modes, models, config_options) =
+                config_state(response.modes, response.models, response.config_options);
 
             if let Some(default_mode) = self.default_mode.clone() {
                 if let Some(modes) = modes.as_ref() {
                     let mut modes_ref = modes.borrow_mut();
-                    let has_mode = modes_ref.available_modes.iter().any(|mode| mode.id == default_mode);
+                    let has_mode = modes_ref
+                        .available_modes
+                        .iter()
+                        .any(|mode| mode.id == default_mode);
 
                     if has_mode {
                         let initial_mode_id = modes_ref.current_mode_id.clone();
@@ -802,14 +1144,21 @@ impl AgentConnection for AcpConnection {
                             let modes = modes.clone();
                             let conn = self.connection.clone();
                             async move |_| {
-                                let result = conn.set_session_mode(acp::SetSessionModeRequest::new(session_id, default_mode))
-                                .await.log_err();
+                                let result = into_foreground_future(
+                                    conn.send_request(acp::SetSessionModeRequest::new(
+                                        session_id,
+                                        default_mode,
+                                    )),
+                                )
+                                .await
+                                .log_err();
 
                                 if result.is_none() {
                                     modes.borrow_mut().current_mode_id = initial_mode_id;
                                 }
                             }
-                        }).detach();
+                        })
+                        .detach();
 
                         modes_ref.current_mode_id = default_mode;
                     } else {
@@ -830,7 +1179,10 @@ impl AgentConnection for AcpConnection {
             if let Some(default_model) = self.default_model.clone() {
                 if let Some(models) = models.as_ref() {
                     let mut models_ref = models.borrow_mut();
-                    let has_model = models_ref.available_models.iter().any(|model| model.model_id == default_model);
+                    let has_model = models_ref
+                        .available_models
+                        .iter()
+                        .any(|model| model.model_id == default_model);
 
                     if has_model {
                         let initial_model_id = models_ref.current_model_id.clone();
@@ -841,14 +1193,21 @@ impl AgentConnection for AcpConnection {
                             let models = models.clone();
                             let conn = self.connection.clone();
                             async move |_| {
-                                let result = conn.set_session_model(acp::SetSessionModelRequest::new(session_id, default_model))
-                                .await.log_err();
+                                let result = into_foreground_future(
+                                    conn.send_request(acp::SetSessionModelRequest::new(
+                                        session_id,
+                                        default_model,
+                                    )),
+                                )
+                                .await
+                                .log_err();
 
                                 if result.is_none() {
                                     models.borrow_mut().current_model_id = initial_model_id;
                                 }
                             }
-                        }).detach();
+                        })
+                        .detach();
 
                         models_ref.current_model_id = default_model;
                     } else {
@@ -881,7 +1240,9 @@ impl AgentConnection for AcpConnection {
                     action_log,
                     response.session_id.clone(),
                     // ACP doesn't currently support per-session prompt capabilities or changing capabilities dynamically.
-                    watch::Receiver::constant(self.agent_capabilities.prompt_capabilities.clone()),
+                    watch::Receiver::constant(
+                        self.agent_capabilities.prompt_capabilities.clone(),
+                    ),
                     cx,
                 )
             });
@@ -935,12 +1296,14 @@ impl AgentConnection for AcpConnection {
             title,
             move |connection, session_id, cwd| {
                 Box::pin(async move {
-                    let response = connection
-                        .load_session(
-                            acp::LoadSessionRequest::new(session_id, cwd).mcp_servers(mcp_servers),
-                        )
-                        .await
-                        .map_err(map_acp_error)?;
+                    let response = into_foreground_future(
+                        connection.send_request(
+                            acp::LoadSessionRequest::new(session_id.clone(), cwd)
+                                .mcp_servers(mcp_servers),
+                        ),
+                    )
+                    .await
+                    .map_err(map_acp_error)?;
                     Ok(SessionConfigResponse {
                         modes: response.modes,
                         models: response.models,
@@ -979,13 +1342,14 @@ impl AgentConnection for AcpConnection {
             title,
             move |connection, session_id, cwd| {
                 Box::pin(async move {
-                    let response = connection
-                        .resume_session(
-                            acp::ResumeSessionRequest::new(session_id, cwd)
+                    let response = into_foreground_future(
+                        connection.send_request(
+                            acp::ResumeSessionRequest::new(session_id.clone(), cwd)
                                 .mcp_servers(mcp_servers),
-                        )
-                        .await
-                        .map_err(map_acp_error)?;
+                        ),
+                    )
+                    .await
+                    .map_err(map_acp_error)?;
                     Ok(SessionConfigResponse {
                         modes: response.modes,
                         models: response.models,
@@ -1034,8 +1398,10 @@ impl AgentConnection for AcpConnection {
                 let conn = self.connection.clone();
                 let session_id = session_id.clone();
                 return cx.foreground_executor().spawn(async move {
-                    conn.close_session(acp::CloseSessionRequest::new(session_id))
-                        .await?;
+                    into_foreground_future(
+                        conn.send_request(acp::CloseSessionRequest::new(session_id)),
+                    )
+                    .await?;
                     Ok(())
                 });
             }
@@ -1059,8 +1425,10 @@ impl AgentConnection for AcpConnection {
         let conn = self.connection.clone();
         let session_id = session_id.clone();
         cx.foreground_executor().spawn(async move {
-            conn.close_session(acp::CloseSessionRequest::new(session_id))
-                .await?;
+            into_foreground_future(
+                conn.send_request(acp::CloseSessionRequest::new(session_id.clone())),
+            )
+            .await?;
             Ok(())
         })
     }
@@ -1109,7 +1477,7 @@ impl AgentConnection for AcpConnection {
     fn authenticate(&self, method_id: acp::AuthMethodId, cx: &mut App) -> Task<Result<()>> {
         let conn = self.connection.clone();
         cx.foreground_executor().spawn(async move {
-            conn.authenticate(acp::AuthenticateRequest::new(method_id))
+            into_foreground_future(conn.send_request(acp::AuthenticateRequest::new(method_id)))
                 .await?;
             Ok(())
         })
@@ -1125,7 +1493,7 @@ impl AgentConnection for AcpConnection {
         let sessions = self.sessions.clone();
         let session_id = params.session_id.clone();
         cx.foreground_executor().spawn(async move {
-            let result = conn.prompt(params).await;
+            let result = into_foreground_future(conn.send_request(params)).await;
 
             let mut suppress_abort_err = false;
 
@@ -1176,15 +1544,12 @@ impl AgentConnection for AcpConnection {
         })
     }
 
-    fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {
+    fn cancel(&self, session_id: &acp::SessionId, _cx: &mut App) {
         if let Some(session) = self.sessions.borrow_mut().get_mut(session_id) {
             session.suppress_abort_err = true;
         }
-        let conn = self.connection.clone();
         let params = acp::CancelNotification::new(session_id.clone());
-        cx.foreground_executor()
-            .spawn(async move { conn.cancel(params).await })
-            .detach();
+        self.connection.send_notification(params).log_err();
     }
 
     fn session_modes(
@@ -1544,73 +1909,6 @@ pub mod test_support {
         }
     }
 
-    struct FakeAcpAgent {
-        load_session_count: Arc<AtomicUsize>,
-        close_session_count: Arc<AtomicUsize>,
-        fail_next_prompt: Arc<AtomicBool>,
-    }
-
-    #[async_trait::async_trait(?Send)]
-    impl acp::Agent for FakeAcpAgent {
-        async fn initialize(
-            &self,
-            args: acp::InitializeRequest,
-        ) -> acp::Result<acp::InitializeResponse> {
-            Ok(
-                acp::InitializeResponse::new(args.protocol_version).agent_capabilities(
-                    acp::AgentCapabilities::default()
-                        .load_session(true)
-                        .session_capabilities(
-                            acp::SessionCapabilities::default()
-                                .close(acp::SessionCloseCapabilities::new()),
-                        ),
-                ),
-            )
-        }
-
-        async fn authenticate(
-            &self,
-            _: acp::AuthenticateRequest,
-        ) -> acp::Result<acp::AuthenticateResponse> {
-            Ok(Default::default())
-        }
-
-        async fn new_session(
-            &self,
-            _: acp::NewSessionRequest,
-        ) -> acp::Result<acp::NewSessionResponse> {
-            Ok(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
-        }
-
-        async fn prompt(&self, _: acp::PromptRequest) -> acp::Result<acp::PromptResponse> {
-            if self.fail_next_prompt.swap(false, Ordering::SeqCst) {
-                Err(acp::ErrorCode::InternalError.into())
-            } else {
-                Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))
-            }
-        }
-
-        async fn cancel(&self, _: acp::CancelNotification) -> acp::Result<()> {
-            Ok(())
-        }
-
-        async fn load_session(
-            &self,
-            _: acp::LoadSessionRequest,
-        ) -> acp::Result<acp::LoadSessionResponse> {
-            self.load_session_count.fetch_add(1, Ordering::SeqCst);
-            Ok(acp::LoadSessionResponse::new())
-        }
-
-        async fn close_session(
-            &self,
-            _: acp::CloseSessionRequest,
-        ) -> acp::Result<acp::CloseSessionResponse> {
-            self.close_session_count.fetch_add(1, Ordering::SeqCst);
-            Ok(acp::CloseSessionResponse::new())
-        }
-    }
-
     async fn build_fake_acp_connection(
         project: Entity<Project>,
         load_session_count: Arc<AtomicUsize>,
@@ -1618,63 +1916,135 @@ pub mod test_support {
         fail_next_prompt: Arc<AtomicBool>,
         cx: &mut AsyncApp,
     ) -> Result<FakeAcpConnectionHarness> {
-        let (c2a_writer, c2a_reader) = async_pipe::pipe();
-        let (a2c_writer, a2c_reader) = async_pipe::pipe();
+        let (client_transport, agent_transport) = agent_client_protocol::Channel::duplex();
 
         let sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>> =
             Rc::new(RefCell::new(HashMap::default()));
-        let session_list_container: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
+        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
             Rc::new(RefCell::new(None));
 
-        let foreground = cx.foreground_executor().clone();
-
-        let client_delegate = ClientDelegate {
-            sessions: sessions.clone(),
-            session_list: session_list_container,
-            cx: cx.clone(),
-        };
+        let agent_future = Agent
+            .builder()
+            .name("fake-agent")
+            .on_receive_request(
+                async move |req: acp::InitializeRequest, responder, _cx| {
+                    responder.respond(
+                        acp::InitializeResponse::new(req.protocol_version).agent_capabilities(
+                            acp::AgentCapabilities::default()
+                                .load_session(true)
+                                .session_capabilities(
+                                    acp::SessionCapabilities::default()
+                                        .close(acp::SessionCloseCapabilities::new()),
+                                ),
+                        ),
+                    )
+                },
+                agent_client_protocol::on_receive_request!(),
+            )
+            .on_receive_request(
+                async move |_req: acp::AuthenticateRequest, responder, _cx| {
+                    responder.respond(Default::default())
+                },
+                agent_client_protocol::on_receive_request!(),
+            )
+            .on_receive_request(
+                async move |_req: acp::NewSessionRequest, responder, _cx| {
+                    responder.respond(acp::NewSessionResponse::new(acp::SessionId::new("unused")))
+                },
+                agent_client_protocol::on_receive_request!(),
+            )
+            .on_receive_request(
+                {
+                    let fail_next_prompt = fail_next_prompt.clone();
+                    async move |_req: acp::PromptRequest, responder, _cx| {
+                        if fail_next_prompt.swap(false, Ordering::SeqCst) {
+                            responder.respond_with_error(acp::ErrorCode::InternalError.into())
+                        } else {
+                            responder.respond(acp::PromptResponse::new(acp::StopReason::EndTurn))
+                        }
+                    }
+                },
+                agent_client_protocol::on_receive_request!(),
+            )
+            .on_receive_request(
+                {
+                    let load_session_count = load_session_count.clone();
+                    async move |_req: acp::LoadSessionRequest, responder, _cx| {
+                        load_session_count.fetch_add(1, Ordering::SeqCst);
+                        responder.respond(acp::LoadSessionResponse::new())
+                    }
+                },
+                agent_client_protocol::on_receive_request!(),
+            )
+            .on_receive_request(
+                {
+                    let close_session_count = close_session_count.clone();
+                    async move |_req: acp::CloseSessionRequest, responder, _cx| {
+                        close_session_count.fetch_add(1, Ordering::SeqCst);
+                        responder.respond(acp::CloseSessionResponse::new())
+                    }
+                },
+                agent_client_protocol::on_receive_request!(),
+            )
+            .on_receive_notification(
+                async move |_notif: acp::CancelNotification, _cx| Ok(()),
+                agent_client_protocol::on_receive_notification!(),
+            )
+            .connect_to(agent_transport);
 
-        let (client_conn, client_io_task) =
-            acp::ClientSideConnection::new(client_delegate, c2a_writer, a2c_reader, {
-                let foreground = foreground.clone();
-                move |fut| {
-                    foreground.spawn(fut).detach();
-                }
-            });
+        let agent_io_task = cx.background_spawn(agent_future);
 
-        let fake_agent = FakeAcpAgent {
-            load_session_count: load_session_count.clone(),
-            close_session_count: close_session_count.clone(),
-            fail_next_prompt,
-        };
+        // Wire the production handler set into the fake client so inbound
+        // requests/notifications from the fake agent are dispatched the
+        // same way the real `stdio` path does.
+        let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
 
-        let (_, agent_io_task) =
-            acp::AgentSideConnection::new(fake_agent, a2c_writer, c2a_reader, {
-                let foreground = foreground.clone();
-                move |fut| {
-                    foreground.spawn(fut).detach();
-                }
-            });
+        let (connection_tx, connection_rx) = futures::channel::oneshot::channel();
+        let client_future = connect_client_future(
+            "zed-test",
+            client_transport,
+            dispatch_tx.clone(),
+            connection_tx,
+        );
+        let client_io_task = cx.background_spawn(async move {
+            client_future.await.ok();
+        });
 
-        let client_io_task = cx.background_spawn(client_io_task);
-        let agent_io_task = cx.background_spawn(agent_io_task);
+        let client_conn: ConnectionTo<Agent> = connection_rx
+            .await
+            .context("failed to receive fake ACP connection handle")?;
 
-        let response = client_conn
-            .initialize(acp::InitializeRequest::new(acp::ProtocolVersion::V1))
-            .await?;
+        let response = into_foreground_future(
+            client_conn.send_request(acp::InitializeRequest::new(acp::ProtocolVersion::V1)),
+        )
+        .await?;
 
         let agent_capabilities = response.agent_capabilities;
 
+        let dispatch_context = ClientContext {
+            sessions: sessions.clone(),
+            session_list: client_session_list.clone(),
+        };
+        let dispatch_task = cx.spawn({
+            let mut dispatch_rx = dispatch_rx;
+            async move |cx| {
+                while let Some(work) = dispatch_rx.next().await {
+                    work.run(cx, &dispatch_context);
+                }
+            }
+        });
+
         let agent_server_store =
             project.read_with(cx, |project, _| project.agent_server_store().downgrade());
 
         let connection = cx.update(|cx| {
             AcpConnection::new_for_test(
-                Rc::new(client_conn),
+                client_conn,
                 sessions,
                 agent_capabilities,
                 agent_server_store,
                 client_io_task,
+                dispatch_task,
                 cx,
             )
         });

crates/agent_servers/src/agent_servers.rs 🔗

@@ -12,6 +12,7 @@ use http_client::read_no_proxy_from_env;
 use project::{AgentId, Project, agent_server_store::AgentServerStore};
 
 use acp_thread::AgentConnection;
+use agent_client_protocol::schema as acp_schema;
 use anyhow::Result;
 use gpui::{App, AppContext, Entity, Task};
 use settings::SettingsStore;
@@ -52,31 +53,31 @@ pub trait AgentServer: Send {
 
     fn into_any(self: Rc<Self>) -> Rc<dyn Any>;
 
-    fn default_mode(&self, _cx: &App) -> Option<agent_client_protocol::SessionModeId> {
+    fn default_mode(&self, _cx: &App) -> Option<acp_schema::SessionModeId> {
         None
     }
 
     fn set_default_mode(
         &self,
-        _mode_id: Option<agent_client_protocol::SessionModeId>,
+        _mode_id: Option<acp_schema::SessionModeId>,
         _fs: Arc<dyn Fs>,
         _cx: &mut App,
     ) {
     }
 
-    fn default_model(&self, _cx: &App) -> Option<agent_client_protocol::ModelId> {
+    fn default_model(&self, _cx: &App) -> Option<acp_schema::ModelId> {
         None
     }
 
     fn set_default_model(
         &self,
-        _model_id: Option<agent_client_protocol::ModelId>,
+        _model_id: Option<acp_schema::ModelId>,
         _fs: Arc<dyn Fs>,
         _cx: &mut App,
     ) {
     }
 
-    fn favorite_model_ids(&self, _cx: &mut App) -> HashSet<agent_client_protocol::ModelId> {
+    fn favorite_model_ids(&self, _cx: &mut App) -> HashSet<acp_schema::ModelId> {
         HashSet::default()
     }
 
@@ -95,16 +96,16 @@ pub trait AgentServer: Send {
 
     fn favorite_config_option_value_ids(
         &self,
-        _config_id: &agent_client_protocol::SessionConfigId,
+        _config_id: &acp_schema::SessionConfigId,
         _cx: &mut App,
-    ) -> HashSet<agent_client_protocol::SessionConfigValueId> {
+    ) -> HashSet<acp_schema::SessionConfigValueId> {
         HashSet::default()
     }
 
     fn toggle_favorite_config_option_value(
         &self,
-        _config_id: agent_client_protocol::SessionConfigId,
-        _value_id: agent_client_protocol::SessionConfigValueId,
+        _config_id: acp_schema::SessionConfigId,
+        _value_id: acp_schema::SessionConfigValueId,
         _should_be_favorite: bool,
         _fs: Arc<dyn Fs>,
         _cx: &App,
@@ -113,7 +114,7 @@ pub trait AgentServer: Send {
 
     fn toggle_favorite_model(
         &self,
-        _model_id: agent_client_protocol::ModelId,
+        _model_id: acp_schema::ModelId,
         _should_be_favorite: bool,
         _fs: Arc<dyn Fs>,
         _cx: &App,

crates/agent_servers/src/custom.rs 🔗

@@ -1,6 +1,6 @@
 use crate::{AgentServer, AgentServerDelegate, load_proxy_env};
 use acp_thread::AgentConnection;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::{Context as _, Result};
 use collections::HashSet;
 use fs::Fs;

crates/agent_servers/src/e2e_tests.rs 🔗

@@ -1,6 +1,6 @@
 use crate::{AgentServer, AgentServerDelegate};
 use acp_thread::{AcpThread, AgentThreadEntry, ToolCall, ToolCallStatus};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use client::RefreshLlmTokenListener;
 use futures::{FutureExt, StreamExt, channel::mpsc, select};
 use gpui::AppContext;
@@ -379,7 +379,7 @@ macro_rules! common_e2e_tests {
             async fn tool_call_with_permission(cx: &mut ::gpui::TestAppContext) {
                 $crate::e2e_tests::test_tool_call_with_permission(
                     $server,
-                    ::agent_client_protocol::PermissionOptionId::new($allow_option_id),
+                    ::agent_client_protocol::schema::PermissionOptionId::new($allow_option_id),
                     cx,
                 )
                 .await;

crates/agent_settings/src/agent_settings.rs 🔗

@@ -3,7 +3,7 @@ mod agent_profile;
 use std::path::{Component, Path};
 use std::sync::{Arc, LazyLock};
 
-use agent_client_protocol::ModelId;
+use agent_client_protocol::schema as acp;
 use collections::{HashSet, IndexMap};
 use fs::Fs;
 use futures::channel::oneshot;
@@ -204,10 +204,10 @@ impl AgentSettings {
         self.message_editor_min_lines * 2
     }
 
-    pub fn favorite_model_ids(&self) -> HashSet<ModelId> {
+    pub fn favorite_model_ids(&self) -> HashSet<acp::ModelId> {
         self.favorite_models
             .iter()
-            .map(|sel| ModelId::new(format!("{}/{}", sel.provider.0, sel.model)))
+            .map(|sel| acp::ModelId::new(format!("{}/{}", sel.provider.0, sel.model)))
             .collect()
     }
 }

crates/agent_ui/src/agent_panel.rs 🔗

@@ -10,7 +10,7 @@ use std::{
 
 use acp_thread::{AcpThread, AcpThreadEvent, MentionUri, ThreadStatus};
 use agent::{ContextServerRegistry, SharedThread, ThreadStore};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_servers::AgentServer;
 use collections::HashSet;
 use db::kvp::{Dismissable, KeyValueStore};

crates/agent_ui/src/agent_ui.rs 🔗

@@ -38,7 +38,7 @@ use std::rc::Rc;
 use std::sync::Arc;
 
 use ::ui::IconName;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::{AgentProfileId, AgentSettings};
 use command_palette_hooks::CommandPaletteFilter;
 use feature_flags::FeatureFlagAppExt as _;
@@ -255,7 +255,7 @@ pub struct NewExternalAgentThread {
 #[action(namespace = agent)]
 #[serde(deny_unknown_fields)]
 pub struct NewNativeAgentThreadFromSummary {
-    from_session_id: agent_client_protocol::SessionId,
+    from_session_id: acp::SessionId,
 }
 
 #[derive(Debug, Default, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)]
@@ -339,7 +339,7 @@ pub enum AgentInitialContent {
         title: Option<SharedString>,
     },
     ContentBlock {
-        blocks: Vec<agent_client_protocol::ContentBlock>,
+        blocks: Vec<acp::ContentBlock>,
         auto_submit: bool,
     },
     FromExternalSource(ExternalSourcePrompt),

crates/agent_ui/src/completion_provider.rs 🔗

@@ -7,7 +7,7 @@ use std::sync::atomic::AtomicBool;
 use crate::DEFAULT_THREAD_TITLE;
 use crate::thread_metadata_store::{ThreadMetadata, ThreadMetadataStore};
 use acp_thread::MentionUri;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::Result;
 use editor::{CompletionProvider, Editor, code_context_menus::COMPLETION_MENU_MAX_WIDTH};
 use futures::FutureExt as _;

crates/agent_ui/src/config_options.rs 🔗

@@ -1,7 +1,7 @@
 use std::{cmp::Reverse, rc::Rc, sync::Arc};
 
 use acp_thread::AgentSessionConfigOptions;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_servers::AgentServer;
 
 use collections::HashSet;

crates/agent_ui/src/conversation_view.rs 🔗

@@ -10,7 +10,7 @@ use action_log::{ActionLog, ActionLogTelemetry, DiffStats};
 use agent::{
     NativeAgentServer, NativeAgentSessionList, NoModelConfiguredError, SharedThread, ThreadStore,
 };
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 #[cfg(test)]
 use agent_servers::AgentServerDelegate;
 use agent_servers::{AgentServer, GEMINI_TERMINAL_AUTH_METHOD_ID};
@@ -2852,7 +2852,6 @@ pub(crate) mod tests {
     use acp_thread::StubAgentConnection;
     use action_log::ActionLog;
     use agent::{AgentTool, EditFileTool, FetchTool, TerminalTool, ToolPermissionContext};
-    use agent_client_protocol::SessionId;
     use agent_servers::FakeAcpAgentServer;
     use editor::MultiBufferOffset;
     use fs::FakeFs;
@@ -3034,7 +3033,7 @@ pub(crate) mod tests {
                     Rc::new(StubAgentServer::new(ResumeOnlyAgentConnection)),
                     connection_store,
                     Agent::Custom { id: "Test".into() },
-                    Some(SessionId::new("resume-session")),
+                    Some(acp::SessionId::new("resume-session")),
                     None,
                     None,
                     None,
@@ -3081,7 +3080,7 @@ pub(crate) mod tests {
                 self,
                 project,
                 "RestoredAvailableCommandsConnection",
-                SessionId::new("new-session"),
+                acp::SessionId::new("new-session"),
                 cx,
             );
             Task::ready(Ok(thread))
@@ -3171,7 +3170,7 @@ pub(crate) mod tests {
                     Rc::new(StubAgentServer::new(RestoredAvailableCommandsConnection)),
                     connection_store,
                     Agent::Custom { id: "Test".into() },
-                    Some(SessionId::new("restored-session")),
+                    Some(acp::SessionId::new("restored-session")),
                     None,
                     None,
                     None,
@@ -3253,7 +3252,7 @@ pub(crate) mod tests {
                     Rc::new(StubAgentServer::new(connection)),
                     connection_store,
                     Agent::Custom { id: "Test".into() },
-                    Some(SessionId::new("session-1")),
+                    Some(acp::SessionId::new("session-1")),
                     None,
                     Some(PathList::new(&[PathBuf::from("/project/subdir")])),
                     None,
@@ -3360,7 +3359,7 @@ pub(crate) mod tests {
             cx.update(|_window, cx| cx.new(|cx| AgentConnectionStore::new(project.clone(), cx)));
 
         // Simulate a previous run that persisted metadata for this session.
-        let resume_session_id = SessionId::new("persistent-session");
+        let resume_session_id = acp::SessionId::new("persistent-session");
         let stored_title: SharedString = "Persistent chat".into();
         cx.update(|_window, cx| {
             ThreadMetadataStore::global(cx).update(cx, |store, cx| {
@@ -4320,7 +4319,7 @@ pub(crate) mod tests {
         connection: Rc<dyn AgentConnection>,
         project: Entity<Project>,
         name: &'static str,
-        session_id: SessionId,
+        session_id: acp::SessionId,
         cx: &mut App,
     ) -> Entity<AcpThread> {
         let action_log = cx.new(|_| ActionLog::new(project.clone()));
@@ -4366,7 +4365,7 @@ pub(crate) mod tests {
                 self,
                 project,
                 "ResumeOnlyAgentConnection",
-                SessionId::new("new-session"),
+                acp::SessionId::new("new-session"),
                 cx,
             );
             Task::ready(Ok(thread))
@@ -4546,7 +4545,7 @@ pub(crate) mod tests {
                     self,
                     project,
                     action_log,
-                    SessionId::new("test"),
+                    acp::SessionId::new("test"),
                     watch::Receiver::constant(
                         acp::PromptCapabilities::new()
                             .image(true)
@@ -4626,7 +4625,7 @@ pub(crate) mod tests {
                     self.clone(),
                     project,
                     action_log,
-                    SessionId::new("new-session"),
+                    acp::SessionId::new("new-session"),
                     watch::Receiver::constant(
                         acp::PromptCapabilities::new()
                             .image(true)
@@ -7252,7 +7251,7 @@ pub(crate) mod tests {
                     self,
                     project,
                     action_log,
-                    SessionId::new("close-capable-session"),
+                    acp::SessionId::new("close-capable-session"),
                     watch::Receiver::constant(
                         acp::PromptCapabilities::new()
                             .image(true)

crates/agent_ui/src/conversation_view/thread_view.rs 🔗

@@ -2,6 +2,7 @@ use crate::{
     DEFAULT_THREAD_TITLE, SelectPermissionGranularity,
     agent_configuration::configure_context_server_modal::default_markdown_style,
 };
+use agent_client_protocol::schema as acp;
 use std::cell::RefCell;
 
 use acp_thread::{ContentBlock, PlanEntry};
@@ -289,8 +290,8 @@ pub struct ThreadView {
     pub session_capabilities: SharedSessionCapabilities,
     /// Tracks which tool calls have their content/output expanded.
     /// Used for showing/hiding tool call results, terminal output, etc.
-    pub expanded_tool_calls: HashSet<agent_client_protocol::ToolCallId>,
-    pub expanded_tool_call_raw_inputs: HashSet<agent_client_protocol::ToolCallId>,
+    pub expanded_tool_calls: HashSet<acp::ToolCallId>,
+    pub expanded_tool_call_raw_inputs: HashSet<acp::ToolCallId>,
     pub expanded_thinking_blocks: HashSet<(usize, usize)>,
     auto_expanded_thinking_block: Option<(usize, usize)>,
     user_toggled_thinking_blocks: HashSet<(usize, usize)>,
@@ -306,12 +307,11 @@ pub struct ThreadView {
     pub queued_message_editor_subscriptions: Vec<Subscription>,
     pub last_synced_queue_length: usize,
     pub turn_fields: TurnFields,
-    pub discarded_partial_edits: HashSet<agent_client_protocol::ToolCallId>,
+    pub discarded_partial_edits: HashSet<acp::ToolCallId>,
     pub is_loading_contents: bool,
     pub new_server_version_available: Option<SharedString>,
     pub resumed_without_history: bool,
-    pub(crate) permission_selections:
-        HashMap<agent_client_protocol::ToolCallId, PermissionSelection>,
+    pub(crate) permission_selections: HashMap<acp::ToolCallId, PermissionSelection>,
     pub resume_thread_metadata: Option<AgentSessionInfo>,
     pub _cancel_task: Option<Task<()>>,
     _save_task: Option<Task<()>>,

crates/agent_ui/src/entry_view_state.rs 🔗

@@ -2,7 +2,7 @@ use std::ops::Range;
 
 use acp_thread::{AcpThread, AgentThreadEntry};
 use agent::ThreadStore;
-use agent_client_protocol::ToolCallId;
+use agent_client_protocol::schema as acp;
 use collections::HashMap;
 use editor::{Editor, EditorEvent, EditorMode, MinimapVisibility, SizingBehavior};
 use gpui::{
@@ -283,9 +283,9 @@ pub struct EntryViewEvent {
 }
 
 pub enum ViewEvent {
-    NewDiff(ToolCallId),
-    NewTerminal(ToolCallId),
-    TerminalMovedToBackground(ToolCallId),
+    NewDiff(acp::ToolCallId),
+    NewTerminal(acp::ToolCallId),
+    TerminalMovedToBackground(acp::ToolCallId),
     MessageEditorEvent(Entity<MessageEditor>, MessageEditorEvent),
     OpenDiffLocation {
         path: String,
@@ -482,7 +482,7 @@ mod tests {
     use std::sync::Arc;
 
     use acp_thread::{AgentConnection, StubAgentConnection};
-    use agent_client_protocol as acp;
+    use agent_client_protocol::schema as acp;
     use buffer_diff::{DiffHunkStatus, DiffHunkStatusKind};
     use editor::RowInfo;
     use fs::FakeFs;

crates/agent_ui/src/mention_set.rs 🔗

@@ -1,7 +1,7 @@
 use crate::diagnostics::{DiagnosticsOptions, codeblock_fence_for_path, collect_diagnostics};
 use acp_thread::{MentionUri, selection_name};
 use agent::{ThreadStore, outline};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_servers::{AgentServer, AgentServerDelegate};
 use anyhow::{Context as _, Result, anyhow};
 use collections::{HashMap, HashSet};

crates/agent_ui/src/message_editor.rs 🔗

@@ -10,7 +10,7 @@ use crate::{
 };
 use acp_thread::MentionUri;
 use agent::ThreadStore;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::{Result, anyhow};
 use editor::{
     Addon, AnchorRangeExt, ContextMenuOptions, Editor, EditorElement, EditorEvent, EditorMode,
@@ -1907,7 +1907,7 @@ mod tests {
 
     use acp_thread::MentionUri;
     use agent::{ThreadStore, outline};
-    use agent_client_protocol as acp;
+    use agent_client_protocol::schema as acp;
     use base64::Engine as _;
     use editor::{
         AnchorRangeExt as _, Editor, EditorMode, MultiBufferOffset, SelectionEffects,

crates/agent_ui/src/mode_selector.rs 🔗

@@ -1,5 +1,5 @@
 use acp_thread::AgentSessionModes;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_servers::AgentServer;
 
 use fs::Fs;

crates/agent_ui/src/model_selector.rs 🔗

@@ -1,7 +1,7 @@
 use std::{cmp::Reverse, rc::Rc, sync::Arc};
 
 use acp_thread::{AgentModelIcon, AgentModelInfo, AgentModelList, AgentModelSelector};
-use agent_client_protocol::ModelId;
+use agent_client_protocol::schema as acp;
 use agent_servers::AgentServer;
 
 use anyhow::Result;
@@ -57,7 +57,7 @@ pub struct ModelPickerDelegate {
     selected_index: usize,
     selected_description: Option<(usize, SharedString, bool)>,
     selected_model: Option<AgentModelInfo>,
-    favorites: HashSet<ModelId>,
+    favorites: HashSet<acp::ModelId>,
     _refresh_models_task: Task<()>,
     _settings_subscription: Subscription,
     focus_handle: FocusHandle,
@@ -424,7 +424,7 @@ impl PickerDelegate for ModelPickerDelegate {
 
 fn info_list_to_picker_entries(
     model_list: AgentModelList,
-    favorites: &HashSet<ModelId>,
+    favorites: &HashSet<acp::ModelId>,
 ) -> Vec<ModelPickerEntry> {
     let mut entries = Vec::new();
 
@@ -530,7 +530,6 @@ async fn fuzzy_search(
 
 #[cfg(test)]
 mod tests {
-    use agent_client_protocol as acp;
     use gpui::TestAppContext;
 
     use super::*;
@@ -592,10 +591,10 @@ mod tests {
         }
     }
 
-    fn create_favorites(models: Vec<&str>) -> HashSet<ModelId> {
+    fn create_favorites(models: Vec<&str>) -> HashSet<acp::ModelId> {
         models
             .into_iter()
-            .map(|m| ModelId::new(m.to_string()))
+            .map(|m| acp::ModelId::new(m.to_string()))
             .collect()
     }
 
@@ -791,7 +790,7 @@ mod tests {
 
     #[gpui::test]
     fn test_favorites_count_returns_correct_count(_cx: &mut TestAppContext) {
-        let empty_favorites: HashSet<ModelId> = HashSet::default();
+        let empty_favorites: HashSet<acp::ModelId> = HashSet::default();
         assert_eq!(empty_favorites.len(), 0);
 
         let one_favorite = create_favorites(vec!["model-a"]);

crates/agent_ui/src/test_support.rs 🔗

@@ -1,5 +1,5 @@
 use acp_thread::{AgentConnection, StubAgentConnection};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_servers::{AgentServer, AgentServerDelegate};
 use gpui::{Entity, Task, TestAppContext, VisualTestContext};
 use project::AgentId;

crates/agent_ui/src/thread_import.rs 🔗

@@ -1,6 +1,6 @@
 use acp_thread::AgentSessionListRequest;
 use agent::ThreadStore;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use chrono::Utc;
 use collections::HashSet;
 use db::kvp::Dismissable;

crates/agent_ui/src/thread_metadata_store.rs 🔗

@@ -4,7 +4,7 @@ use std::{
 };
 
 use agent::{ThreadStore, ZED_AGENT_ID};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::Context as _;
 use chrono::{DateTime, Utc};
 use collections::{HashMap, HashSet};
@@ -1680,8 +1680,7 @@ mod tests {
     use acp_thread::StubAgentConnection;
     use action_log::ActionLog;
     use agent::DbThread;
-    use agent_client_protocol as acp;
-
+    use agent_client_protocol::schema as acp;
     use gpui::{TestAppContext, VisualTestContext};
     use project::FakeFs;
     use project::Project;

crates/agent_ui/src/threads_archive_view.rs 🔗

@@ -10,7 +10,7 @@ use crate::thread_metadata_store::{
 use crate::{Agent, ArchiveSelectedThread, DEFAULT_THREAD_TITLE, RemoveSelectedThread};
 
 use agent::ThreadStore;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use chrono::{DateTime, Datelike as _, Local, NaiveDate, TimeDelta, Utc};
 use collections::HashMap;

crates/agent_ui/src/ui/mention_crease.rs 🔗

@@ -1,7 +1,7 @@
 use std::{ops::RangeInclusive, path::PathBuf, time::Duration};
 
 use acp_thread::MentionUri;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use editor::{Editor, SelectionEffects, scroll::Autoscroll};
 use gpui::{
     Animation, AnimationExt, AnyView, Context, IntoElement, WeakEntity, Window, pulsating_between,

crates/eval_cli/src/main.rs 🔗

@@ -40,7 +40,7 @@ use std::time::{Duration, Instant};
 
 use acp_thread::AgentConnection as _;
 use agent::{NativeAgent, NativeAgentConnection, Templates, ThreadStore};
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use anyhow::{Context, Result};
 use clap::Parser;
 use feature_flags::FeatureFlagAppExt as _;

crates/sidebar/src/sidebar.rs 🔗

@@ -2,7 +2,7 @@ mod thread_switcher;
 
 use acp_thread::ThreadStatus;
 use action_log::DiffStats;
-use agent_client_protocol::{self as acp};
+use agent_client_protocol::schema as acp;
 use agent_settings::AgentSettings;
 use agent_ui::thread_metadata_store::{
     ThreadMetadata, ThreadMetadataStore, WorktreePaths, worktree_info_from_thread_paths,

crates/sidebar/src/thread_switcher.rs 🔗

@@ -1,5 +1,5 @@
 use action_log::DiffStats;
-use agent_client_protocol as acp;
+use agent_client_protocol::schema as acp;
 use agent_ui::thread_metadata_store::ThreadMetadata;
 use gpui::{
     Action as _, DismissEvent, Entity, EventEmitter, FocusHandle, Focusable, Modifiers,

crates/zed/src/main.rs 🔗

@@ -5,7 +5,7 @@ mod reliability;
 mod zed;
 
 use agent::{SharedThread, ThreadStore};
-use agent_client_protocol;
+use agent_client_protocol::schema as acp;
 use agent_ui::AgentPanel;
 use anyhow::{Context as _, Result};
 use clap::Parser;
@@ -990,7 +990,7 @@ fn handle_open_request(request: OpenRequest, app_state: Arc<AppState>, cx: &mut
 
                     let shared_thread = SharedThread::from_bytes(&response.thread_data)?;
                     let db_thread = shared_thread.to_db_thread();
-                    let session_id = agent_client_protocol::SessionId::new(session_id);
+                    let session_id = acp::SessionId::new(session_id);
 
                     let save_session_id = session_id.clone();
 

crates/zed/src/visual_test_runner.rs 🔗

@@ -95,7 +95,7 @@ fn main() {
 #[cfg(target_os = "macos")]
 use {
     acp_thread::{AgentConnection, StubAgentConnection},
-    agent_client_protocol as acp,
+    agent_client_protocol::schema as acp,
     agent_servers::{AgentServer, AgentServerDelegate},
     anyhow::{Context as _, Result},
     assets::Assets,