Fix race conditions with LSP requests that return buffers

Max Brunsfeld and Nathan Sobo created

* Avoid panic when registering a buffer that was previously open,
  and whose weak handle was still present in the open_buffers map.
* Avoid releasing any buffers while a request is outstanding which
  could return a reference to a buffer.

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/project/src/lsp_command.rs |  23 +++++-
crates/project/src/project.rs     | 117 +++++++++++++++++++++++++++++---
crates/server/src/rpc.rs          |  32 ++++++--
3 files changed, 145 insertions(+), 27 deletions(-)

Detailed changes

crates/project/src/lsp_command.rs 🔗

@@ -1,4 +1,4 @@
-use crate::{DocumentHighlight, Location, Project, ProjectTransaction};
+use crate::{BufferRequestHandle, DocumentHighlight, Location, Project, ProjectTransaction};
 use anyhow::{anyhow, Result};
 use async_trait::async_trait;
 use client::{proto, PeerId};
@@ -48,6 +48,7 @@ pub(crate) trait LspCommand: 'static + Sized {
         message: <Self::ProtoRequest as proto::RequestMessage>::Response,
         project: ModelHandle<Project>,
         buffer: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         cx: AsyncAppContext,
     ) -> Result<Self::Response>;
     fn buffer_id_from_proto(message: &Self::ProtoRequest) -> u64;
@@ -161,6 +162,7 @@ impl LspCommand for PrepareRename {
         message: proto::PrepareRenameResponse,
         _: ModelHandle<Project>,
         buffer: ModelHandle<Buffer>,
+        _: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Option<Range<Anchor>>> {
         if message.can_rename {
@@ -277,6 +279,7 @@ impl LspCommand for PerformRename {
         message: proto::PerformRenameResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<ProjectTransaction> {
         let message = message
@@ -284,7 +287,12 @@ impl LspCommand for PerformRename {
             .ok_or_else(|| anyhow!("missing transaction"))?;
         project
             .update(&mut cx, |project, cx| {
-                project.deserialize_project_transaction(message, self.push_to_history, cx)
+                project.deserialize_project_transaction(
+                    message,
+                    self.push_to_history,
+                    request_handle,
+                    cx,
+                )
             })
             .await
     }
@@ -427,13 +435,16 @@ impl LspCommand for GetDefinition {
         message: proto::GetDefinitionResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Vec<Location>> {
         let mut locations = Vec::new();
         for location in message.locations {
             let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
             let buffer = project
-                .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                .update(&mut cx, |this, cx| {
+                    this.deserialize_buffer(buffer, request_handle.clone(), cx)
+                })
                 .await?;
             let start = location
                 .start
@@ -575,13 +586,16 @@ impl LspCommand for GetReferences {
         message: proto::GetReferencesResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Vec<Location>> {
         let mut locations = Vec::new();
         for location in message.locations {
             let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
             let target_buffer = project
-                .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                .update(&mut cx, |this, cx| {
+                    this.deserialize_buffer(buffer, request_handle.clone(), cx)
+                })
                 .await?;
             let start = location
                 .start
@@ -706,6 +720,7 @@ impl LspCommand for GetDocumentHighlights {
         message: proto::GetDocumentHighlightsResponse,
         _: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        _: BufferRequestHandle,
         _: AsyncAppContext,
     ) -> Result<Vec<DocumentHighlight>> {
         Ok(message

crates/project/src/project.rs 🔗

@@ -25,12 +25,17 @@ use rand::prelude::*;
 use sha2::{Digest, Sha256};
 use smol::block_on;
 use std::{
+    cell::RefCell,
     convert::TryInto,
     hash::Hash,
     mem,
     ops::Range,
     path::{Component, Path, PathBuf},
-    sync::{atomic::AtomicBool, Arc},
+    rc::Rc,
+    sync::{
+        atomic::{AtomicBool, AtomicUsize, Ordering},
+        Arc,
+    },
     time::Instant,
 };
 use util::{post_inc, ResultExt, TryFutureExt as _};
@@ -58,6 +63,8 @@ pub struct Project {
         ProjectPath,
         postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
     >,
+    buffer_request_count: Rc<AtomicUsize>,
+    preserved_buffers: Rc<RefCell<Vec<ModelHandle<Buffer>>>>,
     shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
     nonce: u128,
 }
@@ -142,6 +149,11 @@ pub struct Symbol {
     pub signature: [u8; 32],
 }
 
+pub struct BufferRequestHandle {
+    buffer_request_count: Rc<AtomicUsize>,
+    preserved_buffers: Rc<RefCell<Vec<ModelHandle<Buffer>>>>,
+}
+
 #[derive(Default)]
 pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
 
@@ -273,6 +285,7 @@ impl Project {
                 open_buffers: Default::default(),
                 loading_buffers: Default::default(),
                 shared_buffers: Default::default(),
+                preserved_buffers: Default::default(),
                 client_state: ProjectClientState::Local {
                     is_shared: false,
                     remote_id_tx,
@@ -288,6 +301,7 @@ impl Project {
                 fs,
                 language_servers_with_diagnostics_running: 0,
                 language_servers: Default::default(),
+                buffer_request_count: Default::default(),
                 started_language_servers: Default::default(),
                 nonce: StdRng::from_entropy().gen(),
             }
@@ -342,6 +356,8 @@ impl Project {
                 language_servers_with_diagnostics_running: 0,
                 language_servers: Default::default(),
                 started_language_servers: Default::default(),
+                buffer_request_count: Default::default(),
+                preserved_buffers: Default::default(),
                 nonce: StdRng::from_entropy().gen(),
             };
             for worktree in worktrees {
@@ -682,6 +698,7 @@ impl Project {
         let remote_worktree_id = worktree.read(cx).id();
         let path = path.clone();
         let path_string = path.to_string_lossy().to_string();
+        let request_handle = self.start_buffer_request(cx);
         cx.spawn(|this, mut cx| async move {
             let response = rpc
                 .request(proto::OpenBuffer {
@@ -691,8 +708,11 @@ impl Project {
                 })
                 .await?;
             let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
-            this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
-                .await
+
+            this.update(&mut cx, |this, cx| {
+                this.deserialize_buffer(buffer, request_handle, cx)
+            })
+            .await
         })
     }
 
@@ -733,6 +753,21 @@ impl Project {
         })
     }
 
+    fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle {
+        if self.buffer_request_count.fetch_add(1, Ordering::SeqCst) == 0 {
+            self.preserved_buffers.borrow_mut().extend(
+                self.open_buffers
+                    .values()
+                    .filter_map(|buffer| buffer.upgrade(cx)),
+            )
+        }
+
+        BufferRequestHandle {
+            buffer_request_count: self.buffer_request_count.clone(),
+            preserved_buffers: self.preserved_buffers.clone(),
+        }
+    }
+
     pub fn save_buffer_as(
         &self,
         buffer: ModelHandle<Buffer>,
@@ -804,15 +839,23 @@ impl Project {
         worktree: Option<&ModelHandle<Worktree>>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        match self.open_buffers.insert(
-            buffer.read(cx).remote_id(),
-            OpenBuffer::Loaded(buffer.downgrade()),
-        ) {
+        let remote_id = buffer.read(cx).remote_id();
+        match self
+            .open_buffers
+            .insert(remote_id, OpenBuffer::Loaded(buffer.downgrade()))
+        {
             None => {}
             Some(OpenBuffer::Loading(operations)) => {
                 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
             }
-            Some(OpenBuffer::Loaded(_)) => Err(anyhow!("registered the same buffer twice"))?,
+            Some(OpenBuffer::Loaded(existing_handle)) => {
+                if existing_handle.upgrade(cx).is_some() {
+                    Err(anyhow!(
+                        "already registered buffer with remote id {}",
+                        remote_id
+                    ))?
+                }
+            }
         }
         self.assign_language_to_buffer(&buffer, worktree, cx);
         Ok(())
@@ -1195,6 +1238,7 @@ impl Project {
 
         let remote_buffers = self.remote_id().zip(remote_buffers);
         let client = self.client.clone();
+        let request_handle = self.start_buffer_request(cx);
 
         cx.spawn(|this, mut cx| async move {
             let mut project_transaction = ProjectTransaction::default();
@@ -1213,7 +1257,12 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing transaction"))?;
                 project_transaction = this
                     .update(&mut cx, |this, cx| {
-                        this.deserialize_project_transaction(response, push_to_history, cx)
+                        this.deserialize_project_transaction(
+                            response,
+                            push_to_history,
+                            request_handle,
+                            cx,
+                        )
                     })
                     .await?;
             }
@@ -1430,6 +1479,7 @@ impl Project {
                 cx,
             )
         } else if let Some(project_id) = self.remote_id() {
+            let request_handle = self.start_buffer_request(cx);
             let request = self.client.request(proto::OpenBufferForSymbol {
                 project_id,
                 symbol: Some(serialize_symbol(symbol)),
@@ -1437,8 +1487,10 @@ impl Project {
             cx.spawn(|this, mut cx| async move {
                 let response = request.await?;
                 let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?;
-                this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
-                    .await
+                this.update(&mut cx, |this, cx| {
+                    this.deserialize_buffer(buffer, request_handle, cx)
+                })
+                .await
             })
         } else {
             Task::ready(Err(anyhow!("project does not have a remote id")))
@@ -1817,6 +1869,7 @@ impl Project {
             })
         } else if let Some(project_id) = self.remote_id() {
             let client = self.client.clone();
+            let request_handle = self.start_buffer_request(cx);
             let request = proto::ApplyCodeAction {
                 project_id,
                 buffer_id: buffer_handle.read(cx).remote_id(),
@@ -1829,7 +1882,12 @@ impl Project {
                     .transaction
                     .ok_or_else(|| anyhow!("missing transaction"))?;
                 this.update(&mut cx, |this, cx| {
-                    this.deserialize_project_transaction(response, push_to_history, cx)
+                    this.deserialize_project_transaction(
+                        response,
+                        push_to_history,
+                        request_handle,
+                        cx,
+                    )
                 })
                 .await
             })
@@ -2020,11 +2078,12 @@ impl Project {
             }
         } else if let Some(project_id) = self.remote_id() {
             let rpc = self.client.clone();
+            let request_handle = self.start_buffer_request(cx);
             let message = request.to_proto(project_id, buffer);
             return cx.spawn(|this, cx| async move {
                 let response = rpc.request(message).await?;
                 request
-                    .response_from_proto(response, this, buffer_handle, cx)
+                    .response_from_proto(response, this, buffer_handle, request_handle, cx)
                     .await
             });
         }
@@ -2864,13 +2923,16 @@ impl Project {
         &mut self,
         message: proto::ProjectTransaction,
         push_to_history: bool,
+        request_handle: BufferRequestHandle,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ProjectTransaction>> {
         cx.spawn(|this, mut cx| async move {
             let mut project_transaction = ProjectTransaction::default();
             for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) {
                 let buffer = this
-                    .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                    .update(&mut cx, |this, cx| {
+                        this.deserialize_buffer(buffer, request_handle.clone(), cx)
+                    })
                     .await?;
                 let transaction = language::proto::deserialize_transaction(transaction)?;
                 project_transaction.0.insert(buffer, transaction);
@@ -2917,6 +2979,7 @@ impl Project {
     fn deserialize_buffer(
         &mut self,
         buffer: proto::Buffer,
+        request_handle: BufferRequestHandle,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ModelHandle<Buffer>>> {
         let replica_id = self.replica_id();
@@ -2963,6 +3026,8 @@ impl Project {
                     let buffer = cx.add_model(|cx| {
                         Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap()
                     });
+
+                    request_handle.preserve_buffer(buffer.clone());
                     this.update(&mut cx, |this, cx| {
                         this.register_buffer(&buffer, buffer_worktree.as_ref(), cx)
                     })?;
@@ -3111,6 +3176,30 @@ impl Project {
     }
 }
 
+impl BufferRequestHandle {
+    fn preserve_buffer(&self, buffer: ModelHandle<Buffer>) {
+        self.preserved_buffers.borrow_mut().push(buffer);
+    }
+}
+
+impl Clone for BufferRequestHandle {
+    fn clone(&self) -> Self {
+        self.buffer_request_count.fetch_add(1, Ordering::SeqCst);
+        Self {
+            buffer_request_count: self.buffer_request_count.clone(),
+            preserved_buffers: self.preserved_buffers.clone(),
+        }
+    }
+}
+
+impl Drop for BufferRequestHandle {
+    fn drop(&mut self) {
+        if self.buffer_request_count.fetch_sub(1, Ordering::SeqCst) == 1 {
+            self.preserved_buffers.borrow_mut().clear();
+        }
+    }
+}
+
 impl WorktreeHandle {
     pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
         match self {

crates/server/src/rpc.rs 🔗

@@ -4856,6 +4856,8 @@ mod tests {
                     cx.background().simulate_random_delay().await;
                 }
 
+                log::info!("Host done");
+
                 self.project = Some(project);
                 (self, cx)
             }
@@ -4887,15 +4889,25 @@ mod tests {
                     };
 
                     operations.set(operations.get() + 1);
-                    let project_path = worktree.read_with(&cx, |worktree, _| {
-                        let entry = worktree
-                            .entries(false)
-                            .filter(|e| e.is_file())
-                            .choose(&mut *rng.lock())
-                            .unwrap();
-                        (worktree.id(), entry.path.clone())
-                    });
-                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
+                    let (worktree_root_name, project_path) =
+                        worktree.read_with(&cx, |worktree, _| {
+                            let entry = worktree
+                                .entries(false)
+                                .filter(|e| e.is_file())
+                                .choose(&mut *rng.lock())
+                                .unwrap();
+                            (
+                                worktree.root_name().to_string(),
+                                (worktree.id(), entry.path.clone()),
+                            )
+                        });
+                    log::info!(
+                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
+                        guest_id,
+                        project_path.0,
+                        worktree_root_name,
+                        project_path.1
+                    );
                     let buffer = project
                         .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
                         .await
@@ -5062,6 +5074,8 @@ mod tests {
                 cx.background().simulate_random_delay().await;
             }
 
+            log::info!("Guest {} done", guest_id);
+
             self.project = Some(project);
             (self, cx)
         }