Mostly, a fancy buffer pool implementation

Phillip Davis created

Change summary

Cargo.lock            | 51 +++++++++++++++++++++++++++++++
Cargo.toml            |  1 
src/buf_pool.rs       | 74 +++++++++++++++++++++++++++++++++++++++++++++
src/main.rs           |  1 
src/mcp/server.rs     |  3 +
src/mcp/tools/read.rs |  9 +++++
6 files changed, 139 insertions(+)

Detailed changes

Cargo.lock 🔗

@@ -372,6 +372,56 @@ version = "0.8.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
 
+[[package]]
+name = "crossbeam"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-epoch",
+ "crossbeam-queue",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-channel"
+version = "0.5.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-deque"
+version = "0.8.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51"
+dependencies = [
+ "crossbeam-epoch",
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-epoch"
+version = "0.9.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e"
+dependencies = [
+ "crossbeam-utils",
+]
+
+[[package]]
+name = "crossbeam-queue"
+version = "0.3.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-utils"
 version = "0.8.21"
@@ -852,6 +902,7 @@ dependencies = [
  "async-lsp",
  "async-process",
  "clap",
+ "crossbeam",
  "futures",
  "rmcp",
  "serde",

Cargo.toml 🔗

@@ -9,6 +9,7 @@ async-gen = "0.2.3"
 async-lsp = "0.2.2"
 async-process = "2.3"
 clap = { version = "4.5.48", features = ["derive"] }
+crossbeam = "0.8"
 futures = "0.3.31"
 rmcp = { version = "0.8.0", features = ["server", "transport-io"] }
 serde = { version = "1.0", features = ["derive"] }

src/buf_pool.rs 🔗

@@ -0,0 +1,74 @@
+use crossbeam::queue::SegQueue;
+use std::ops::{Deref, DerefMut};
+use std::sync::Arc;
+
+/// A thread-safe, lock-free pool of reusable byte buffers.
+#[derive(Clone)]
+pub struct BufPool {
+    queue: Arc<SegQueue<Vec<u8>>>,
+}
+
+impl BufPool {
+    /// Creates a pool pre-populated with `capacity` empty buffers.
+    pub fn with_capacity(capacity: usize) -> Self {
+        let queue = Arc::new(SegQueue::new());
+        for _ in 0..capacity {
+            queue.push(Vec::new());
+        }
+        Self { queue }
+    }
+
+    /// Checks out a buffer from the pool. If the pool is empty, allocates a new buffer.
+    pub fn checkout(&self) -> BufGuard {
+        let buf = self.queue.pop().unwrap_or_else(|| Vec::new());
+        BufGuard {
+            buf,
+            pool: self.queue.clone(),
+        }
+    }
+}
+
+/// RAII guard that automatically returns the buffer to the pool when dropped.
+pub struct BufGuard {
+    buf: Vec<u8>,
+    pool: Arc<SegQueue<Vec<u8>>>,
+}
+
+impl BufGuard {
+    /// Extracts the buffer, preventing automatic return to the pool.
+    /// Useful if you need to move the buffer elsewhere.
+    pub fn into_inner(mut self) -> Vec<u8> {
+        let buf = std::mem::take(&mut self.buf);
+        // SAFETY:
+        // 1) We forget `self`, preventing it from
+        // double-free'ing `pool`
+        // 2) The 'pointer' is really a &mut, so
+        // it is aligned, type-valid, and valid for reads.
+        let pool = unsafe { std::ptr::read(&self.pool) };
+        std::mem::forget(self);
+        drop(pool);
+        buf
+    }
+}
+
+impl Drop for BufGuard {
+    fn drop(&mut self) {
+        let mut buf = std::mem::take(&mut self.buf);
+        buf.clear();
+        self.pool.push(buf);
+    }
+}
+
+impl Deref for BufGuard {
+    type Target = Vec<u8>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.buf
+    }
+}
+
+impl DerefMut for BufGuard {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.buf
+    }
+}

src/main.rs 🔗

@@ -1,3 +1,4 @@
+mod buf_pool;
 mod config;
 mod lsp;
 mod mcp;

src/mcp/server.rs 🔗

@@ -11,6 +11,7 @@ use rmcp::{tool_handler, tool_router};
 
 use async_lsp::ServerSocket as LSPServerSocket;
 
+use crate::buf_pool::BufPool;
 use crate::config::Config;
 use crate::lsp::LSPClient;
 pub use crate::mcp::tools::read::*;
@@ -19,6 +20,7 @@ pub struct MCPServer {
     pub(crate) tool_router: ToolRouter<Self>,
     pub(crate) config: Arc<Config>,
     pub(crate) lsp_server: LSPServerSocket,
+    pub(crate) buf_pool: BufPool,
 }
 
 pub async fn setup(config: Arc<Config>) -> anyhow::Result<(MCPServer, LSPClient)> {
@@ -35,6 +37,7 @@ impl MCPServer {
             config,
             lsp_server,
             tool_router: Self::tool_router(),
+            buf_pool: BufPool::with_capacity(8),
         }
     }
 

src/mcp/tools/read.rs 🔗

@@ -29,6 +29,15 @@ pub async fn call(
     server: &MCPServer,
     Parameters(args): Parameters<ReadToolArgs>,
 ) -> Result<CallToolResult, MCPError> {
+    let file_path = server
+        .config
+        .path_in_project(&args.path)
+        .await
+        .map_err(|err| MCPError::invalid_request(format!("{:?}", err), None))?
+        .ok_or_else(|| {
+            MCPError::invalid_params(format!("Path not in project: {:?}", &args.path), None)
+        })?;
+
     Err(MCPError::internal_error("Not yet implemented", None))
     // let content = tokio::fs::read_to_string(&file_path)
     //     .await