Merge pull request #486 from zed-industries/background-highlights

Antonio Scandurra created

Move `GetDocumentHighlights` to the background and fix collaboration race conditions

Change summary

crates/client/src/client.rs       |  25 +
crates/editor/src/editor.rs       |  24 +-
crates/language/src/language.rs   |   6 
crates/language/src/tests.rs      |   4 
crates/lsp/src/lsp.rs             | 106 +++++----
crates/project/src/lsp_command.rs |  23 +
crates/project/src/project.rs     | 237 +++++++++++++++++------
crates/project/src/worktree.rs    | 180 +++++++----------
crates/rpc/proto/zed.proto        |  19 -
crates/rpc/src/proto.rs           |  13 
crates/server/src/rpc.rs          | 327 ++++++++++++++++++--------------
crates/server/src/rpc/store.rs    |  94 ++------
12 files changed, 591 insertions(+), 467 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -530,10 +530,13 @@ impl Client {
                 let cx = cx.clone();
                 let this = self.clone();
                 async move {
+                    let mut message_id = 0_usize;
                     while let Some(message) = incoming.next().await {
                         let mut state = this.state.write();
-                        let payload_type_id = message.payload_type_id();
+                        message_id += 1;
                         let type_name = message.payload_type_name();
+                        let payload_type_id = message.payload_type_id();
+                        let sender_id = message.original_sender_id().map(|id| id.0);
 
                         let model = state
                             .models_by_message_type
@@ -575,8 +578,10 @@ impl Client {
 
                             let client_id = this.id;
                             log::debug!(
-                                "rpc message received. client_id:{}, name:{}",
+                                "rpc message received. client_id:{}, message_id:{}, sender_id:{:?}, type:{}",
                                 client_id,
+                                message_id,
+                                sender_id,
                                 type_name
                             );
                             cx.foreground()
@@ -584,15 +589,19 @@ impl Client {
                                     match future.await {
                                         Ok(()) => {
                                             log::debug!(
-                                                "rpc message handled. client_id:{}, name:{}",
+                                                "rpc message handled. client_id:{}, message_id:{}, sender_id:{:?}, type:{}",
                                                 client_id,
+                                                message_id,
+                                                sender_id,
                                                 type_name
                                             );
                                         }
                                         Err(error) => {
                                             log::error!(
-                                                "error handling message. client_id:{}, name:{}, {}",
+                                                "error handling message. client_id:{}, message_id:{}, sender_id:{:?}, type:{}, error:{:?}",
                                                 client_id,
+                                                message_id,
+                                                sender_id,
                                                 type_name,
                                                 error
                                             );
@@ -827,7 +836,7 @@ impl Client {
     ) -> impl Future<Output = Result<T::Response>> {
         let client_id = self.id;
         log::debug!(
-            "rpc request start. client_id: {}. name:{}",
+            "rpc request start. client_id:{}. name:{}",
             client_id,
             T::NAME
         );
@@ -837,7 +846,7 @@ impl Client {
         async move {
             let response = response?.await;
             log::debug!(
-                "rpc request finish. client_id: {}. name:{}",
+                "rpc request finish. client_id:{}. name:{}",
                 client_id,
                 T::NAME
             );
@@ -846,7 +855,7 @@ impl Client {
     }
 
     fn respond<T: RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) -> Result<()> {
-        log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
+        log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME);
         self.peer.respond(receipt, response)
     }
 
@@ -855,7 +864,7 @@ impl Client {
         receipt: Receipt<T>,
         error: proto::Error,
     ) -> Result<()> {
-        log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
+        log::debug!("rpc respond. client_id:{}. name:{}", self.id, T::NAME);
         self.peer.respond_with_error(receipt, error)
     }
 }

crates/editor/src/editor.rs 🔗

@@ -8143,16 +8143,18 @@ mod tests {
     #[gpui::test]
     async fn test_completion(mut cx: gpui::TestAppContext) {
         let settings = cx.read(EditorSettings::test);
-        let (language_server, mut fake) = lsp::LanguageServer::fake_with_capabilities(
-            lsp::ServerCapabilities {
-                completion_provider: Some(lsp::CompletionOptions {
-                    trigger_characters: Some(vec![".".to_string(), ":".to_string()]),
+        let (language_server, mut fake) = cx.update(|cx| {
+            lsp::LanguageServer::fake_with_capabilities(
+                lsp::ServerCapabilities {
+                    completion_provider: Some(lsp::CompletionOptions {
+                        trigger_characters: Some(vec![".".to_string(), ":".to_string()]),
+                        ..Default::default()
+                    }),
                     ..Default::default()
-                }),
-                ..Default::default()
-            },
-            cx.background(),
-        );
+                },
+                cx,
+            )
+        });
 
         let text = "
             one
@@ -8318,7 +8320,7 @@ mod tests {
             position: Point,
             completions: Vec<(Range<Point>, &'static str)>,
         ) {
-            fake.handle_request::<lsp::request::Completion, _>(move |params| {
+            fake.handle_request::<lsp::request::Completion, _>(move |params, _| {
                 assert_eq!(
                     params.text_document_position.text_document.uri,
                     lsp::Url::from_file_path(path).unwrap()
@@ -8352,7 +8354,7 @@ mod tests {
             fake: &mut FakeLanguageServer,
             edit: Option<(Range<Point>, &'static str)>,
         ) {
-            fake.handle_request::<lsp::request::ResolveCompletionItem, _>(move |_| {
+            fake.handle_request::<lsp::request::ResolveCompletionItem, _>(move |_, _| {
                 lsp::CompletionItem {
                     additional_text_edits: edit.clone().map(|(range, new_text)| {
                         vec![lsp::TextEdit::new(

crates/language/src/language.rs 🔗

@@ -13,7 +13,7 @@ use futures::{
     future::{BoxFuture, Shared},
     FutureExt, TryFutureExt,
 };
-use gpui::{AppContext, Task};
+use gpui::{MutableAppContext, Task};
 use highlight_map::HighlightMap;
 use lazy_static::lazy_static;
 use parking_lot::{Mutex, RwLock};
@@ -225,7 +225,7 @@ impl LanguageRegistry {
         language: &Arc<Language>,
         root_path: Arc<Path>,
         http_client: Arc<dyn HttpClient>,
-        cx: &AppContext,
+        cx: &mut MutableAppContext,
     ) -> Option<Task<Result<Arc<lsp::LanguageServer>>>> {
         #[cfg(any(test, feature = "test-support"))]
         if let Some(config) = &language.config.language_server {
@@ -234,7 +234,7 @@ impl LanguageRegistry {
 
                 let (server, mut fake_server) = lsp::LanguageServer::fake_with_capabilities(
                     fake_config.capabilities.clone(),
-                    cx.background().clone(),
+                    cx,
                 );
 
                 if let Some(initalizer) = &fake_config.initializer {

crates/language/src/tests.rs 🔗

@@ -554,7 +554,7 @@ fn test_autoindent_adjusts_lines_when_only_text_changes(cx: &mut MutableAppConte
 
 #[gpui::test]
 async fn test_diagnostics(mut cx: gpui::TestAppContext) {
-    let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background());
+    let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake);
     let mut rust_lang = rust_lang();
     rust_lang.config.language_server = Some(LanguageServerConfig {
         disk_based_diagnostic_sources: HashSet::from_iter(["disk".to_string()]),
@@ -837,7 +837,7 @@ async fn test_diagnostics(mut cx: gpui::TestAppContext) {
 
 #[gpui::test]
 async fn test_edits_from_lsp_with_past_version(mut cx: gpui::TestAppContext) {
-    let (language_server, mut fake) = lsp::LanguageServer::fake(cx.background());
+    let (language_server, mut fake) = cx.update(lsp::LanguageServer::fake);
 
     let text = "
         fn a() {

crates/lsp/src/lsp.rs 🔗

@@ -483,36 +483,47 @@ impl Drop for Subscription {
 
 #[cfg(any(test, feature = "test-support"))]
 pub struct FakeLanguageServer {
-    handlers:
-        Arc<Mutex<HashMap<&'static str, Box<dyn Send + Sync + FnMut(usize, &[u8]) -> Vec<u8>>>>>,
+    handlers: Arc<
+        Mutex<
+            HashMap<
+                &'static str,
+                Box<dyn Send + FnMut(usize, &[u8], gpui::AsyncAppContext) -> Vec<u8>>,
+            >,
+        >,
+    >,
     outgoing_tx: futures::channel::mpsc::UnboundedSender<Vec<u8>>,
     incoming_rx: futures::channel::mpsc::UnboundedReceiver<Vec<u8>>,
 }
 
 #[cfg(any(test, feature = "test-support"))]
 impl LanguageServer {
-    pub fn fake(executor: Arc<gpui::executor::Background>) -> (Arc<Self>, FakeLanguageServer) {
-        Self::fake_with_capabilities(Default::default(), executor)
+    pub fn fake(cx: &mut gpui::MutableAppContext) -> (Arc<Self>, FakeLanguageServer) {
+        Self::fake_with_capabilities(Default::default(), cx)
     }
 
     pub fn fake_with_capabilities(
         capabilities: ServerCapabilities,
-        executor: Arc<gpui::executor::Background>,
+        cx: &mut gpui::MutableAppContext,
     ) -> (Arc<Self>, FakeLanguageServer) {
         let (stdin_writer, stdin_reader) = async_pipe::pipe();
         let (stdout_writer, stdout_reader) = async_pipe::pipe();
 
-        let mut fake = FakeLanguageServer::new(executor.clone(), stdin_reader, stdout_writer);
+        let mut fake = FakeLanguageServer::new(stdin_reader, stdout_writer, cx);
         fake.handle_request::<request::Initialize, _>({
             let capabilities = capabilities.clone();
-            move |_| InitializeResult {
+            move |_, _| InitializeResult {
                 capabilities: capabilities.clone(),
                 ..Default::default()
             }
         });
 
-        let server =
-            Self::new_internal(stdin_writer, stdout_reader, Path::new("/"), executor).unwrap();
+        let server = Self::new_internal(
+            stdin_writer,
+            stdout_reader,
+            Path::new("/"),
+            cx.background().clone(),
+        )
+        .unwrap();
 
         (server, fake)
     }
@@ -521,9 +532,9 @@ impl LanguageServer {
 #[cfg(any(test, feature = "test-support"))]
 impl FakeLanguageServer {
     fn new(
-        background: Arc<gpui::executor::Background>,
         stdin: async_pipe::PipeReader,
         stdout: async_pipe::PipeWriter,
+        cx: &mut gpui::MutableAppContext,
     ) -> Self {
         use futures::StreamExt as _;
 
@@ -537,43 +548,42 @@ impl FakeLanguageServer {
 
         // Receive incoming messages
         let handlers = this.handlers.clone();
-        let executor = background.clone();
-        background
-            .spawn(async move {
-                let mut buffer = Vec::new();
-                let mut stdin = smol::io::BufReader::new(stdin);
-                while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
-                    executor.simulate_random_delay().await;
-                    if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
-                        assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
-
-                        if let Some(handler) = handlers.lock().get_mut(request.method) {
-                            let response = handler(request.id, request.params.get().as_bytes());
-                            log::debug!("handled lsp request. method:{}", request.method);
-                            outgoing_tx.unbounded_send(response)?;
-                        } else {
-                            log::debug!("unhandled lsp request. method:{}", request.method);
-                            outgoing_tx.unbounded_send(
-                                serde_json::to_vec(&AnyResponse {
-                                    id: request.id,
-                                    error: Some(Error {
-                                        message: "no handler".to_string(),
-                                    }),
-                                    result: None,
-                                })
-                                .unwrap(),
-                            )?;
-                        }
+        cx.spawn(|cx| async move {
+            let mut buffer = Vec::new();
+            let mut stdin = smol::io::BufReader::new(stdin);
+            while Self::receive(&mut stdin, &mut buffer).await.is_ok() {
+                cx.background().simulate_random_delay().await;
+                if let Ok(request) = serde_json::from_slice::<AnyRequest>(&buffer) {
+                    assert_eq!(request.jsonrpc, JSON_RPC_VERSION);
+
+                    if let Some(handler) = handlers.lock().get_mut(request.method) {
+                        let response =
+                            handler(request.id, request.params.get().as_bytes(), cx.clone());
+                        log::debug!("handled lsp request. method:{}", request.method);
+                        outgoing_tx.unbounded_send(response)?;
                     } else {
-                        incoming_tx.unbounded_send(buffer.clone())?;
+                        log::debug!("unhandled lsp request. method:{}", request.method);
+                        outgoing_tx.unbounded_send(
+                            serde_json::to_vec(&AnyResponse {
+                                id: request.id,
+                                error: Some(Error {
+                                    message: "no handler".to_string(),
+                                }),
+                                result: None,
+                            })
+                            .unwrap(),
+                        )?;
                     }
+                } else {
+                    incoming_tx.unbounded_send(buffer.clone())?;
                 }
-                Ok::<_, anyhow::Error>(())
-            })
-            .detach();
+            }
+            Ok::<_, anyhow::Error>(())
+        })
+        .detach();
 
         // Send outgoing messages
-        background
+        cx.background()
             .spawn(async move {
                 let mut stdout = smol::io::BufWriter::new(stdout);
                 while let Some(notification) = outgoing_rx.next().await {
@@ -618,13 +628,13 @@ impl FakeLanguageServer {
     ) -> futures::channel::mpsc::UnboundedReceiver<()>
     where
         T: 'static + request::Request,
-        F: 'static + Send + Sync + FnMut(T::Params) -> T::Result,
+        F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> T::Result,
     {
         let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
         self.handlers.lock().insert(
             T::METHOD,
-            Box::new(move |id, params| {
-                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap());
+            Box::new(move |id, params, cx| {
+                let result = handler(serde_json::from_slice::<T::Params>(params).unwrap(), cx);
                 let result = serde_json::to_string(&result).unwrap();
                 let result = serde_json::from_str::<&RawValue>(&result).unwrap();
                 let response = AnyResponse {
@@ -709,8 +719,8 @@ mod tests {
     }
 
     #[gpui::test]
-    async fn test_fake(cx: TestAppContext) {
-        let (server, mut fake) = LanguageServer::fake(cx.background());
+    async fn test_fake(mut cx: TestAppContext) {
+        let (server, mut fake) = cx.update(LanguageServer::fake);
 
         let (message_tx, message_rx) = channel::unbounded();
         let (diagnostics_tx, diagnostics_rx) = channel::unbounded();
@@ -762,7 +772,7 @@ mod tests {
             "file://b/c"
         );
 
-        fake.handle_request::<request::Shutdown, _>(|_| ());
+        fake.handle_request::<request::Shutdown, _>(|_, _| ());
 
         drop(server);
         fake.receive_notification::<notification::Exit>().await;

crates/project/src/lsp_command.rs 🔗

@@ -1,4 +1,4 @@
-use crate::{DocumentHighlight, Location, Project, ProjectTransaction};
+use crate::{BufferRequestHandle, DocumentHighlight, Location, Project, ProjectTransaction};
 use anyhow::{anyhow, Result};
 use async_trait::async_trait;
 use client::{proto, PeerId};
@@ -48,6 +48,7 @@ pub(crate) trait LspCommand: 'static + Sized {
         message: <Self::ProtoRequest as proto::RequestMessage>::Response,
         project: ModelHandle<Project>,
         buffer: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         cx: AsyncAppContext,
     ) -> Result<Self::Response>;
     fn buffer_id_from_proto(message: &Self::ProtoRequest) -> u64;
@@ -161,6 +162,7 @@ impl LspCommand for PrepareRename {
         message: proto::PrepareRenameResponse,
         _: ModelHandle<Project>,
         buffer: ModelHandle<Buffer>,
+        _: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Option<Range<Anchor>>> {
         if message.can_rename {
@@ -277,6 +279,7 @@ impl LspCommand for PerformRename {
         message: proto::PerformRenameResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<ProjectTransaction> {
         let message = message
@@ -284,7 +287,12 @@ impl LspCommand for PerformRename {
             .ok_or_else(|| anyhow!("missing transaction"))?;
         project
             .update(&mut cx, |project, cx| {
-                project.deserialize_project_transaction(message, self.push_to_history, cx)
+                project.deserialize_project_transaction(
+                    message,
+                    self.push_to_history,
+                    request_handle,
+                    cx,
+                )
             })
             .await
     }
@@ -427,13 +435,16 @@ impl LspCommand for GetDefinition {
         message: proto::GetDefinitionResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Vec<Location>> {
         let mut locations = Vec::new();
         for location in message.locations {
             let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
             let buffer = project
-                .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                .update(&mut cx, |this, cx| {
+                    this.deserialize_buffer(buffer, request_handle.clone(), cx)
+                })
                 .await?;
             let start = location
                 .start
@@ -575,13 +586,16 @@ impl LspCommand for GetReferences {
         message: proto::GetReferencesResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Vec<Location>> {
         let mut locations = Vec::new();
         for location in message.locations {
             let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
             let target_buffer = project
-                .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                .update(&mut cx, |this, cx| {
+                    this.deserialize_buffer(buffer, request_handle.clone(), cx)
+                })
                 .await?;
             let start = location
                 .start
@@ -706,6 +720,7 @@ impl LspCommand for GetDocumentHighlights {
         message: proto::GetDocumentHighlightsResponse,
         _: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
+        _: BufferRequestHandle,
         _: AsyncAppContext,
     ) -> Result<Vec<DocumentHighlight>> {
         Ok(message

crates/project/src/project.rs 🔗

@@ -25,11 +25,13 @@ use rand::prelude::*;
 use sha2::{Digest, Sha256};
 use smol::block_on;
 use std::{
+    cell::RefCell,
     convert::TryInto,
     hash::Hash,
     mem,
     ops::Range,
     path::{Component, Path, PathBuf},
+    rc::Rc,
     sync::{atomic::AtomicBool, Arc},
     time::Instant,
 };
@@ -52,16 +54,23 @@ pub struct Project {
     collaborators: HashMap<PeerId, Collaborator>,
     subscriptions: Vec<client::Subscription>,
     language_servers_with_diagnostics_running: isize,
-    open_buffers: HashMap<u64, OpenBuffer>,
     opened_buffer: broadcast::Sender<()>,
     loading_buffers: HashMap<
         ProjectPath,
         postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
     >,
+    buffers_state: Rc<RefCell<ProjectBuffers>>,
     shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
     nonce: u128,
 }
 
+#[derive(Default)]
+struct ProjectBuffers {
+    buffer_request_count: usize,
+    preserved_buffers: Vec<ModelHandle<Buffer>>,
+    open_buffers: HashMap<u64, OpenBuffer>,
+}
+
 enum OpenBuffer {
     Loaded(WeakModelHandle<Buffer>),
     Loading(Vec<Operation>),
@@ -142,6 +151,8 @@ pub struct Symbol {
     pub signature: [u8; 32],
 }
 
+pub struct BufferRequestHandle(Rc<RefCell<ProjectBuffers>>);
+
 #[derive(Default)]
 pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
 
@@ -169,7 +180,7 @@ impl DiagnosticSummary {
         this
     }
 
-    pub fn to_proto(&self, path: Arc<Path>) -> proto::DiagnosticSummary {
+    pub fn to_proto(&self, path: &Path) -> proto::DiagnosticSummary {
         proto::DiagnosticSummary {
             path: path.to_string_lossy().to_string(),
             error_count: self.error_count as u32,
@@ -195,7 +206,7 @@ impl Project {
         client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updated);
         client.add_entity_message_handler(Self::handle_disk_based_diagnostics_updating);
         client.add_entity_message_handler(Self::handle_remove_collaborator);
-        client.add_entity_message_handler(Self::handle_share_worktree);
+        client.add_entity_message_handler(Self::handle_register_worktree);
         client.add_entity_message_handler(Self::handle_unregister_worktree);
         client.add_entity_message_handler(Self::handle_unshare_project);
         client.add_entity_message_handler(Self::handle_update_buffer_file);
@@ -270,7 +281,7 @@ impl Project {
             Self {
                 worktrees: Default::default(),
                 collaborators: Default::default(),
-                open_buffers: Default::default(),
+                buffers_state: Default::default(),
                 loading_buffers: Default::default(),
                 shared_buffers: Default::default(),
                 client_state: ProjectClientState::Local {
@@ -323,7 +334,6 @@ impl Project {
         let this = cx.add_model(|cx| {
             let mut this = Self {
                 worktrees: Vec::new(),
-                open_buffers: Default::default(),
                 loading_buffers: Default::default(),
                 opened_buffer: broadcast::channel(1).0,
                 shared_buffers: Default::default(),
@@ -342,6 +352,7 @@ impl Project {
                 language_servers_with_diagnostics_running: 0,
                 language_servers: Default::default(),
                 started_language_servers: Default::default(),
+                buffers_state: Default::default(),
                 nonce: StdRng::from_entropy().gen(),
             };
             for worktree in worktrees {
@@ -390,7 +401,9 @@ impl Project {
 
     #[cfg(any(test, feature = "test-support"))]
     pub fn has_buffered_operations(&self) -> bool {
-        self.open_buffers
+        self.buffers_state
+            .borrow()
+            .open_buffers
             .values()
             .any(|buffer| matches!(buffer, OpenBuffer::Loading(_)))
     }
@@ -621,11 +634,6 @@ impl Project {
                     *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
                         // Record the fact that the buffer is no longer loading.
                         this.loading_buffers.remove(&project_path);
-                        if this.loading_buffers.is_empty() {
-                            this.open_buffers
-                                .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_)))
-                        }
-
                         let buffer = load_result.map_err(Arc::new)?;
                         Ok(buffer)
                     }));
@@ -682,6 +690,7 @@ impl Project {
         let remote_worktree_id = worktree.read(cx).id();
         let path = path.clone();
         let path_string = path.to_string_lossy().to_string();
+        let request_handle = self.start_buffer_request(cx);
         cx.spawn(|this, mut cx| async move {
             let response = rpc
                 .request(proto::OpenBuffer {
@@ -691,8 +700,11 @@ impl Project {
                 })
                 .await?;
             let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
-            this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
-                .await
+
+            this.update(&mut cx, |this, cx| {
+                this.deserialize_buffer(buffer, request_handle, cx)
+            })
+            .await
         })
     }
 
@@ -733,6 +745,10 @@ impl Project {
         })
     }
 
+    fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle {
+        BufferRequestHandle::new(self.buffers_state.clone(), cx)
+    }
+
     pub fn save_buffer_as(
         &self,
         buffer: ModelHandle<Buffer>,
@@ -761,40 +777,47 @@ impl Project {
     pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
         let path = path.into();
         if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
-            self.open_buffers.iter().any(|(_, buffer)| {
-                if let Some(buffer) = buffer.upgrade(cx) {
-                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
-                        if file.worktree == worktree && file.path() == &path.path {
-                            return true;
+            self.buffers_state
+                .borrow()
+                .open_buffers
+                .iter()
+                .any(|(_, buffer)| {
+                    if let Some(buffer) = buffer.upgrade(cx) {
+                        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+                            if file.worktree == worktree && file.path() == &path.path {
+                                return true;
+                            }
                         }
                     }
-                }
-                false
-            })
+                    false
+                })
         } else {
             false
         }
     }
 
-    fn get_open_buffer(
+    pub fn get_open_buffer(
         &mut self,
         path: &ProjectPath,
         cx: &mut ModelContext<Self>,
     ) -> Option<ModelHandle<Buffer>> {
         let mut result = None;
         let worktree = self.worktree_for_id(path.worktree_id, cx)?;
-        self.open_buffers.retain(|_, buffer| {
-            if let Some(buffer) = buffer.upgrade(cx) {
-                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
-                    if file.worktree == worktree && file.path() == &path.path {
-                        result = Some(buffer);
+        self.buffers_state
+            .borrow_mut()
+            .open_buffers
+            .retain(|_, buffer| {
+                if let Some(buffer) = buffer.upgrade(cx) {
+                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+                        if file.worktree == worktree && file.path() == &path.path {
+                            result = Some(buffer);
+                        }
                     }
+                    true
+                } else {
+                    false
                 }
-                true
-            } else {
-                false
-            }
-        });
+            });
         result
     }
 
@@ -804,15 +827,25 @@ impl Project {
         worktree: Option<&ModelHandle<Worktree>>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        match self.open_buffers.insert(
-            buffer.read(cx).remote_id(),
-            OpenBuffer::Loaded(buffer.downgrade()),
-        ) {
+        let remote_id = buffer.read(cx).remote_id();
+        match self
+            .buffers_state
+            .borrow_mut()
+            .open_buffers
+            .insert(remote_id, OpenBuffer::Loaded(buffer.downgrade()))
+        {
             None => {}
             Some(OpenBuffer::Loading(operations)) => {
                 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
             }
-            Some(OpenBuffer::Loaded(_)) => Err(anyhow!("registered the same buffer twice"))?,
+            Some(OpenBuffer::Loaded(existing_handle)) => {
+                if existing_handle.upgrade(cx).is_some() {
+                    Err(anyhow!(
+                        "already registered buffer with remote id {}",
+                        remote_id
+                    ))?
+                }
+            }
         }
         self.assign_language_to_buffer(&buffer, worktree, cx);
         Ok(())
@@ -1132,7 +1165,7 @@ impl Project {
             path: relative_path.into(),
         };
 
-        for buffer in self.open_buffers.values() {
+        for buffer in self.buffers_state.borrow().open_buffers.values() {
             if let Some(buffer) = buffer.upgrade(cx) {
                 if buffer
                     .read(cx)
@@ -1195,6 +1228,7 @@ impl Project {
 
         let remote_buffers = self.remote_id().zip(remote_buffers);
         let client = self.client.clone();
+        let request_handle = self.start_buffer_request(cx);
 
         cx.spawn(|this, mut cx| async move {
             let mut project_transaction = ProjectTransaction::default();
@@ -1213,7 +1247,12 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing transaction"))?;
                 project_transaction = this
                     .update(&mut cx, |this, cx| {
-                        this.deserialize_project_transaction(response, push_to_history, cx)
+                        this.deserialize_project_transaction(
+                            response,
+                            push_to_history,
+                            request_handle,
+                            cx,
+                        )
                     })
                     .await?;
             }
@@ -1430,6 +1469,7 @@ impl Project {
                 cx,
             )
         } else if let Some(project_id) = self.remote_id() {
+            let request_handle = self.start_buffer_request(cx);
             let request = self.client.request(proto::OpenBufferForSymbol {
                 project_id,
                 symbol: Some(serialize_symbol(symbol)),
@@ -1437,8 +1477,10 @@ impl Project {
             cx.spawn(|this, mut cx| async move {
                 let response = request.await?;
                 let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?;
-                this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
-                    .await
+                this.update(&mut cx, |this, cx| {
+                    this.deserialize_buffer(buffer, request_handle, cx)
+                })
+                .await
             })
         } else {
             Task::ready(Err(anyhow!("project does not have a remote id")))
@@ -1817,6 +1859,7 @@ impl Project {
             })
         } else if let Some(project_id) = self.remote_id() {
             let client = self.client.clone();
+            let request_handle = self.start_buffer_request(cx);
             let request = proto::ApplyCodeAction {
                 project_id,
                 buffer_id: buffer_handle.read(cx).remote_id(),
@@ -1829,7 +1872,12 @@ impl Project {
                     .transaction
                     .ok_or_else(|| anyhow!("missing transaction"))?;
                 this.update(&mut cx, |this, cx| {
-                    this.deserialize_project_transaction(response, push_to_history, cx)
+                    this.deserialize_project_transaction(
+                        response,
+                        push_to_history,
+                        request_handle,
+                        cx,
+                    )
                 })
                 .await
             })
@@ -2020,11 +2068,12 @@ impl Project {
             }
         } else if let Some(project_id) = self.remote_id() {
             let rpc = self.client.clone();
+            let request_handle = self.start_buffer_request(cx);
             let message = request.to_proto(project_id, buffer);
             return cx.spawn(|this, cx| async move {
                 let response = rpc.request(message).await?;
                 request
-                    .response_from_proto(response, this, buffer_handle, cx)
+                    .response_from_proto(response, this, buffer_handle, request_handle, cx)
                     .await
             });
         }
@@ -2047,7 +2096,7 @@ impl Project {
         }
     }
 
-    fn find_local_worktree(
+    pub fn find_local_worktree(
         &self,
         abs_path: &Path,
         cx: &AppContext,
@@ -2152,7 +2201,7 @@ impl Project {
     ) {
         let snapshot = worktree_handle.read(cx).snapshot();
         let mut buffers_to_delete = Vec::new();
-        for (buffer_id, buffer) in &self.open_buffers {
+        for (buffer_id, buffer) in &self.buffers_state.borrow().open_buffers {
             if let Some(buffer) = buffer.upgrade(cx) {
                 buffer.update(cx, |buffer, cx| {
                     if let Some(old_file) = File::from_dyn(buffer.file()) {
@@ -2209,7 +2258,10 @@ impl Project {
         }
 
         for buffer_id in buffers_to_delete {
-            self.open_buffers.remove(&buffer_id);
+            self.buffers_state
+                .borrow_mut()
+                .open_buffers
+                .remove(&buffer_id);
         }
     }
 
@@ -2337,7 +2389,7 @@ impl Project {
                 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
                 .replica_id;
             this.shared_buffers.remove(&peer_id);
-            for (_, buffer) in &this.open_buffers {
+            for (_, buffer) in &this.buffers_state.borrow().open_buffers {
                 if let Some(buffer) = buffer.upgrade(cx) {
                     buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
                 }
@@ -2347,19 +2399,22 @@ impl Project {
         })
     }
 
-    async fn handle_share_worktree(
+    async fn handle_register_worktree(
         this: ModelHandle<Self>,
-        envelope: TypedEnvelope<proto::ShareWorktree>,
+        envelope: TypedEnvelope<proto::RegisterWorktree>,
         client: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<()> {
         this.update(&mut cx, |this, cx| {
             let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?;
             let replica_id = this.replica_id();
-            let worktree = envelope
-                .payload
-                .worktree
-                .ok_or_else(|| anyhow!("invalid worktree"))?;
+            let worktree = proto::Worktree {
+                id: envelope.payload.worktree_id,
+                root_name: envelope.payload.root_name,
+                entries: Default::default(),
+                diagnostic_summaries: Default::default(),
+                weak: envelope.payload.weak,
+            };
             let (worktree, load_task) =
                 Worktree::remote(remote_id, replica_id, worktree, client, cx);
             this.add_worktree(&worktree, cx);
@@ -2461,17 +2516,21 @@ impl Project {
                 .map(|op| language::proto::deserialize_operation(op))
                 .collect::<Result<Vec<_>, _>>()?;
             let is_remote = this.is_remote();
-            match this.open_buffers.entry(buffer_id) {
+            let mut buffers_state = this.buffers_state.borrow_mut();
+            let buffer_request_count = buffers_state.buffer_request_count;
+            match buffers_state.open_buffers.entry(buffer_id) {
                 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
                     OpenBuffer::Loaded(buffer) => {
                         if let Some(buffer) = buffer.upgrade(cx) {
                             buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+                        } else if is_remote && buffer_request_count > 0 {
+                            e.insert(OpenBuffer::Loading(ops));
                         }
                     }
                     OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops),
                 },
                 hash_map::Entry::Vacant(e) => {
-                    if is_remote && this.loading_buffers.len() > 0 {
+                    if is_remote && buffer_request_count > 0 {
                         e.insert(OpenBuffer::Loading(ops));
                     }
                 }
@@ -2495,6 +2554,8 @@ impl Project {
                 .ok_or_else(|| anyhow!("no such worktree"))?;
             let file = File::from_proto(file, worktree.clone(), cx)?;
             let buffer = this
+                .buffers_state
+                .borrow_mut()
                 .open_buffers
                 .get_mut(&buffer_id)
                 .and_then(|b| b.upgrade(cx))
@@ -2861,17 +2922,21 @@ impl Project {
         &mut self,
         message: proto::ProjectTransaction,
         push_to_history: bool,
+        request_handle: BufferRequestHandle,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ProjectTransaction>> {
         cx.spawn(|this, mut cx| async move {
             let mut project_transaction = ProjectTransaction::default();
             for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) {
                 let buffer = this
-                    .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                    .update(&mut cx, |this, cx| {
+                        this.deserialize_buffer(buffer, request_handle.clone(), cx)
+                    })
                     .await?;
                 let transaction = language::proto::deserialize_transaction(transaction)?;
                 project_transaction.0.insert(buffer, transaction);
             }
+
             for (buffer, transaction) in &project_transaction.0 {
                 buffer
                     .update(&mut cx, |buffer, _| {
@@ -2914,6 +2979,7 @@ impl Project {
     fn deserialize_buffer(
         &mut self,
         buffer: proto::Buffer,
+        request_handle: BufferRequestHandle,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ModelHandle<Buffer>>> {
         let replica_id = self.replica_id();
@@ -2925,7 +2991,9 @@ impl Project {
                 proto::buffer::Variant::Id(id) => {
                     let buffer = loop {
                         let buffer = this.read_with(&cx, |this, cx| {
-                            this.open_buffers
+                            this.buffers_state
+                                .borrow()
+                                .open_buffers
                                 .get(&id)
                                 .and_then(|buffer| buffer.upgrade(cx))
                         });
@@ -2960,6 +3028,8 @@ impl Project {
                     let buffer = cx.add_model(|cx| {
                         Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap()
                     });
+
+                    request_handle.preserve_buffer(buffer.clone());
                     this.update(&mut cx, |this, cx| {
                         this.register_buffer(&buffer, buffer_worktree.as_ref(), cx)
                     })?;
@@ -3032,6 +3102,8 @@ impl Project {
 
         this.update(&mut cx, |this, cx| {
             let buffer = this
+                .buffers_state
+                .borrow()
                 .open_buffers
                 .get(&envelope.payload.buffer_id)
                 .and_then(|buffer| buffer.upgrade(cx));
@@ -3058,6 +3130,8 @@ impl Project {
             .into();
         this.update(&mut cx, |this, cx| {
             let buffer = this
+                .buffers_state
+                .borrow()
                 .open_buffers
                 .get(&payload.buffer_id)
                 .and_then(|buffer| buffer.upgrade(cx));
@@ -3108,6 +3182,48 @@ impl Project {
     }
 }
 
+impl BufferRequestHandle {
+    fn new(state: Rc<RefCell<ProjectBuffers>>, cx: &AppContext) -> Self {
+        {
+            let state = &mut *state.borrow_mut();
+            state.buffer_request_count += 1;
+            if state.buffer_request_count == 1 {
+                state.preserved_buffers.extend(
+                    state
+                        .open_buffers
+                        .values()
+                        .filter_map(|buffer| buffer.upgrade(cx)),
+                )
+            }
+        }
+        Self(state)
+    }
+
+    fn preserve_buffer(&self, buffer: ModelHandle<Buffer>) {
+        self.0.borrow_mut().preserved_buffers.push(buffer);
+    }
+}
+
+impl Clone for BufferRequestHandle {
+    fn clone(&self) -> Self {
+        self.0.borrow_mut().buffer_request_count += 1;
+        Self(self.0.clone())
+    }
+}
+
+impl Drop for BufferRequestHandle {
+    fn drop(&mut self) {
+        let mut state = self.0.borrow_mut();
+        state.buffer_request_count -= 1;
+        if state.buffer_request_count == 0 {
+            state.preserved_buffers.clear();
+            state
+                .open_buffers
+                .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_)))
+        }
+    }
+}
+
 impl WorktreeHandle {
     pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
         match self {
@@ -3612,7 +3728,7 @@ mod tests {
             .unwrap();
 
         let mut fake_server = fake_servers.next().await.unwrap();
-        fake_server.handle_request::<lsp::request::GotoDefinition, _>(move |params| {
+        fake_server.handle_request::<lsp::request::GotoDefinition, _>(move |params, _| {
             let params = params.text_document_position_params;
             assert_eq!(
                 params.text_document.uri.to_file_path().unwrap(),
@@ -3885,7 +4001,6 @@ mod tests {
                 &initial_snapshot,
                 1,
                 1,
-                0,
                 true,
             );
             remote
@@ -4504,7 +4619,7 @@ mod tests {
             project.prepare_rename(buffer.clone(), 7, cx)
         });
         fake_server
-            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
+            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
                 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
                 assert_eq!(params.position, lsp::Position::new(0, 7));
                 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
@@ -4523,7 +4638,7 @@ mod tests {
             project.perform_rename(buffer.clone(), 7, "THREE".to_string(), true, cx)
         });
         fake_server
-            .handle_request::<lsp::request::Rename, _>(|params| {
+            .handle_request::<lsp::request::Rename, _>(|params, _| {
                 assert_eq!(
                     params.text_document_position.text_document.uri.as_str(),
                     "file:///dir/one.rs"

crates/project/src/worktree.rs 🔗

@@ -7,7 +7,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Context, Result};
 use client::{proto, Client, TypedEnvelope};
 use clock::ReplicaId;
-use collections::{HashMap, VecDeque};
+use collections::HashMap;
 use futures::{
     channel::mpsc::{self, UnboundedSender},
     Stream, StreamExt,
@@ -43,7 +43,7 @@ use std::{
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap};
-use util::ResultExt;
+use util::{ResultExt, TryFutureExt};
 
 lazy_static! {
     static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
@@ -84,8 +84,6 @@ pub struct RemoteWorktree {
     queued_operations: Vec<(u64, Operation)>,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     weak: bool,
-    next_update_id: u64,
-    pending_updates: VecDeque<proto::UpdateWorktree>,
 }
 
 #[derive(Clone)]
@@ -138,7 +136,7 @@ enum Registration {
 struct ShareState {
     project_id: u64,
     snapshots_tx: Sender<LocalSnapshot>,
-    _maintain_remote_snapshot: Option<Task<()>>,
+    _maintain_remote_snapshot: Option<Task<Option<()>>>,
 }
 
 #[derive(Default, Deserialize)]
@@ -239,8 +237,6 @@ impl Worktree {
                     }),
                 ),
                 weak,
-                next_update_id: worktree.next_update_id,
-                pending_updates: Default::default(),
             })
         });
 
@@ -739,6 +735,7 @@ impl LocalWorktree {
             worktree_id: self.id().to_proto(),
             root_name: self.root_name().to_string(),
             authorized_logins: self.authorized_logins(),
+            weak: self.weak,
         };
         cx.spawn(|this, mut cx| async move {
             let response = client.request(register_message).await;
@@ -762,68 +759,75 @@ impl LocalWorktree {
         &mut self,
         project_id: u64,
         cx: &mut ModelContext<Worktree>,
-    ) -> Task<anyhow::Result<()>> {
+    ) -> impl Future<Output = Result<()>> {
+        let (mut share_tx, mut share_rx) = oneshot::channel();
         if self.share.is_some() {
-            return Task::ready(Ok(()));
-        }
+            let _ = share_tx.try_send(Ok(()));
+        } else {
+            let snapshot = self.snapshot();
+            let rpc = self.client.clone();
+            let worktree_id = cx.model_id() as u64;
+            let (snapshots_to_send_tx, snapshots_to_send_rx) =
+                smol::channel::unbounded::<LocalSnapshot>();
+            let maintain_remote_snapshot = cx.background().spawn({
+                let rpc = rpc.clone();
+                let snapshot = snapshot.clone();
+                let diagnostic_summaries = self.diagnostic_summaries.clone();
+                async move {
+                    if let Err(error) = rpc
+                        .request(proto::UpdateWorktree {
+                            project_id,
+                            worktree_id,
+                            root_name: snapshot.root_name().to_string(),
+                            updated_entries: snapshot
+                                .entries_by_path
+                                .iter()
+                                .filter(|e| !e.is_ignored)
+                                .map(Into::into)
+                                .collect(),
+                            removed_entries: Default::default(),
+                        })
+                        .await
+                    {
+                        let _ = share_tx.try_send(Err(error));
+                        return Err(anyhow!("failed to send initial update worktree"));
+                    } else {
+                        let _ = share_tx.try_send(Ok(()));
+                    }
 
-        let snapshot = self.snapshot();
-        let rpc = self.client.clone();
-        let worktree_id = cx.model_id() as u64;
-        let (snapshots_to_send_tx, snapshots_to_send_rx) =
-            smol::channel::unbounded::<LocalSnapshot>();
-        let (mut share_tx, mut share_rx) = oneshot::channel();
-        let maintain_remote_snapshot = cx.background().spawn({
-            let rpc = rpc.clone();
-            let snapshot = snapshot.clone();
-            let diagnostic_summaries = self.diagnostic_summaries.clone();
-            let weak = self.weak;
-            async move {
-                if let Err(error) = rpc
-                    .request(proto::ShareWorktree {
-                        project_id,
-                        worktree: Some(snapshot.to_proto(&diagnostic_summaries, weak)),
-                    })
-                    .await
-                {
-                    let _ = share_tx.try_send(Err(error));
-                    return;
-                } else {
-                    let _ = share_tx.try_send(Ok(()));
-                }
+                    for (path, summary) in diagnostic_summaries.iter() {
+                        rpc.send(proto::UpdateDiagnosticSummary {
+                            project_id,
+                            worktree_id,
+                            summary: Some(summary.to_proto(&path.0)),
+                        })?;
+                    }
 
-                let mut update_id = 0;
-                let mut prev_snapshot = snapshot;
-                while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
-                    let message = snapshot.build_update(
-                        &prev_snapshot,
-                        project_id,
-                        worktree_id,
-                        update_id,
-                        false,
-                    );
-                    match rpc.request(message).await {
-                        Ok(_) => {
-                            prev_snapshot = snapshot;
-                            update_id += 1;
-                        }
-                        Err(err) => log::error!("error sending snapshot diff {}", err),
+                    let mut prev_snapshot = snapshot;
+                    while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
+                        let message =
+                            snapshot.build_update(&prev_snapshot, project_id, worktree_id, false);
+                        rpc.request(message).await?;
+                        prev_snapshot = snapshot;
                     }
+
+                    Ok::<_, anyhow::Error>(())
                 }
-            }
-        });
-        self.share = Some(ShareState {
-            project_id,
-            snapshots_tx: snapshots_to_send_tx,
-            _maintain_remote_snapshot: Some(maintain_remote_snapshot),
-        });
+                .log_err()
+            });
+            self.share = Some(ShareState {
+                project_id,
+                snapshots_tx: snapshots_to_send_tx,
+                _maintain_remote_snapshot: Some(maintain_remote_snapshot),
+            });
+        }
 
-        cx.foreground().spawn(async move {
-            match share_rx.next().await {
-                Some(result) => result,
-                None => Err(anyhow!("unshared before sharing completed")),
-            }
-        })
+        async move {
+            share_rx
+                .next()
+                .await
+                .unwrap_or_else(|| Err(anyhow!("share ended")))
+        }
     }
 
     pub fn unshare(&mut self) {
@@ -844,38 +848,13 @@ impl RemoteWorktree {
         &mut self,
         envelope: TypedEnvelope<proto::UpdateWorktree>,
     ) -> Result<()> {
-        let update = envelope.payload;
-        if update.id > self.next_update_id {
-            let ix = match self
-                .pending_updates
-                .binary_search_by_key(&update.id, |pending| pending.id)
-            {
-                Ok(ix) | Err(ix) => ix,
-            };
-            self.pending_updates.insert(ix, update);
-        } else {
-            let tx = self.updates_tx.clone();
-            self.next_update_id += 1;
-            tx.unbounded_send(update)
-                .expect("consumer runs to completion");
-            while let Some(update) = self.pending_updates.front() {
-                if update.id == self.next_update_id {
-                    self.next_update_id += 1;
-                    tx.unbounded_send(self.pending_updates.pop_front().unwrap())
-                        .expect("consumer runs to completion");
-                } else {
-                    break;
-                }
-            }
-        }
+        self.updates_tx
+            .unbounded_send(envelope.payload)
+            .expect("consumer runs to completion");
 
         Ok(())
     }
 
-    pub fn has_pending_updates(&self) -> bool {
-        !self.pending_updates.is_empty()
-    }
-
     pub fn update_diagnostic_summary(
         &mut self,
         path: Arc<Path>,
@@ -1038,6 +1017,7 @@ impl Snapshot {
 }
 
 impl LocalSnapshot {
+    #[cfg(test)]
     pub(crate) fn to_proto(
         &self,
         diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
@@ -1055,10 +1035,9 @@ impl LocalSnapshot {
                 .collect(),
             diagnostic_summaries: diagnostic_summaries
                 .iter()
-                .map(|(path, summary)| summary.to_proto(path.0.clone()))
+                .map(|(path, summary)| summary.to_proto(&path.0))
                 .collect(),
             weak,
-            next_update_id: 0,
         }
     }
 
@@ -1067,7 +1046,6 @@ impl LocalSnapshot {
         other: &Self,
         project_id: u64,
         worktree_id: u64,
-        update_id: u64,
         include_ignored: bool,
     ) -> proto::UpdateWorktree {
         let mut updated_entries = Vec::new();
@@ -1120,7 +1098,6 @@ impl LocalSnapshot {
         }
 
         proto::UpdateWorktree {
-            id: update_id as u64,
             project_id,
             worktree_id,
             root_name: self.root_name().to_string(),
@@ -2461,7 +2438,7 @@ mod tests {
         fmt::Write,
         time::{SystemTime, UNIX_EPOCH},
     };
-    use util::{post_inc, test::temp_tree};
+    use util::test::temp_tree;
 
     #[gpui::test]
     async fn test_traversal(cx: gpui::TestAppContext) {
@@ -2646,7 +2623,6 @@ mod tests {
             new_scanner.snapshot().to_vec(true)
         );
 
-        let mut update_id = 0;
         for mut prev_snapshot in snapshots {
             let include_ignored = rng.gen::<bool>();
             if !include_ignored {
@@ -2667,13 +2643,9 @@ mod tests {
                 prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
             }
 
-            let update = scanner.snapshot().build_update(
-                &prev_snapshot,
-                0,
-                0,
-                post_inc(&mut update_id),
-                include_ignored,
-            );
+            let update = scanner
+                .snapshot()
+                .build_update(&prev_snapshot, 0, 0, include_ignored);
             prev_snapshot.apply_remote_update(update).unwrap();
             assert_eq!(
                 prev_snapshot.to_vec(true),

crates/rpc/proto/zed.proto 🔗

@@ -34,7 +34,6 @@ message Envelope {
 
         RegisterWorktree register_worktree = 28;
         UnregisterWorktree unregister_worktree = 29;
-        ShareWorktree share_worktree = 30;
         UpdateWorktree update_worktree = 31;
         UpdateDiagnosticSummary update_diagnostic_summary = 32;
         DiskBasedDiagnosticsUpdating disk_based_diagnostics_updating = 33;
@@ -132,6 +131,7 @@ message RegisterWorktree {
     uint64 worktree_id = 2;
     string root_name = 3;
     repeated string authorized_logins = 4;
+    bool weak = 5;
 }
 
 message UnregisterWorktree {
@@ -139,18 +139,12 @@ message UnregisterWorktree {
     uint64 worktree_id = 2;
 }
 
-message ShareWorktree {
-    uint64 project_id = 1;
-    Worktree worktree = 2;
-}
-
 message UpdateWorktree {
-    uint64 id = 1;
-    uint64 project_id = 2;
-    uint64 worktree_id = 3;
-    string root_name = 4;
-    repeated Entry updated_entries = 5;
-    repeated uint64 removed_entries = 6;
+    uint64 project_id = 1;
+    uint64 worktree_id = 2;
+    string root_name = 3;
+    repeated Entry updated_entries = 4;
+    repeated uint64 removed_entries = 5;
 }
 
 message AddProjectCollaborator {
@@ -494,7 +488,6 @@ message Worktree {
     repeated Entry entries = 3;
     repeated DiagnosticSummary diagnostic_summaries = 4;
     bool weak = 5;
-    uint64 next_update_id = 6;
 }
 
 message File {

crates/rpc/src/proto.rs 🔗

@@ -37,6 +37,7 @@ pub trait AnyTypedEnvelope: 'static + Send + Sync {
     fn as_any(&self) -> &dyn Any;
     fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
     fn is_background(&self) -> bool;
+    fn original_sender_id(&self) -> Option<PeerId>;
 }
 
 pub enum MessagePriority {
@@ -64,6 +65,10 @@ impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
     fn is_background(&self) -> bool {
         matches!(T::PRIORITY, MessagePriority::Background)
     }
+
+    fn original_sender_id(&self) -> Option<PeerId> {
+        self.original_sender_id
+    }
 }
 
 macro_rules! messages {
@@ -157,8 +162,8 @@ messages!(
     (GetCompletionsResponse, Foreground),
     (GetDefinition, Foreground),
     (GetDefinitionResponse, Foreground),
-    (GetDocumentHighlights, Foreground),
-    (GetDocumentHighlightsResponse, Foreground),
+    (GetDocumentHighlights, Background),
+    (GetDocumentHighlightsResponse, Background),
     (GetReferences, Foreground),
     (GetReferencesResponse, Foreground),
     (GetProjectSymbols, Background),
@@ -188,7 +193,6 @@ messages!(
     (SendChannelMessage, Foreground),
     (SendChannelMessageResponse, Foreground),
     (ShareProject, Foreground),
-    (ShareWorktree, Foreground),
     (Test, Foreground),
     (UnregisterProject, Foreground),
     (UnregisterWorktree, Foreground),
@@ -228,7 +232,6 @@ request_messages!(
     (SaveBuffer, BufferSaved),
     (SendChannelMessage, SendChannelMessageResponse),
     (ShareProject, Ack),
-    (ShareWorktree, Ack),
     (Test, Test),
     (UpdateBuffer, Ack),
     (UpdateWorktree, Ack),
@@ -259,12 +262,12 @@ entity_messages!(
     PrepareRename,
     RemoveProjectCollaborator,
     SaveBuffer,
-    ShareWorktree,
     UnregisterWorktree,
     UnshareProject,
     UpdateBuffer,
     UpdateBufferFile,
     UpdateDiagnosticSummary,
+    RegisterWorktree,
     UpdateWorktree,
 );
 

crates/server/src/rpc.rs 🔗

@@ -16,7 +16,7 @@ use rpc::{
     Connection, ConnectionId, Peer, TypedEnvelope,
 };
 use sha1::{Digest as _, Sha1};
-use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
+use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
 use store::{Store, Worktree};
 use surf::StatusCode;
 use tide::log;
@@ -73,7 +73,6 @@ impl Server {
             .add_message_handler(Server::leave_project)
             .add_request_handler(Server::register_worktree)
             .add_message_handler(Server::unregister_worktree)
-            .add_request_handler(Server::share_worktree)
             .add_request_handler(Server::update_worktree)
             .add_message_handler(Server::update_diagnostic_summary)
             .add_message_handler(Server::disk_based_diagnostics_updating)
@@ -335,22 +334,21 @@ impl Server {
                     replica_id: 0,
                     user_id: joined.project.host_user_id.to_proto(),
                 });
-                let worktrees = joined
-                    .project
+                let worktrees = share
                     .worktrees
                     .iter()
-                    .filter_map(|(id, worktree)| {
-                        worktree.share.as_ref().map(|share| proto::Worktree {
+                    .filter_map(|(id, shared_worktree)| {
+                        let worktree = joined.project.worktrees.get(&id)?;
+                        Some(proto::Worktree {
                             id: *id,
                             root_name: worktree.root_name.clone(),
-                            entries: share.entries.values().cloned().collect(),
-                            diagnostic_summaries: share
+                            entries: shared_worktree.entries.values().cloned().collect(),
+                            diagnostic_summaries: shared_worktree
                                 .diagnostic_summaries
                                 .values()
                                 .cloned()
                                 .collect(),
                             weak: worktree.weak,
-                            next_update_id: share.next_update_id as u64,
                         })
                     })
                     .collect();
@@ -420,23 +418,33 @@ impl Server {
 
         let mut contact_user_ids = HashSet::default();
         contact_user_ids.insert(host_user_id);
-        for github_login in request.payload.authorized_logins {
-            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
+        for github_login in &request.payload.authorized_logins {
+            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
             contact_user_ids.insert(contact_user_id);
         }
 
         let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
-        self.state_mut().register_worktree(
-            request.payload.project_id,
-            request.payload.worktree_id,
-            request.sender_id,
-            Worktree {
-                authorized_user_ids: contact_user_ids.clone(),
-                root_name: request.payload.root_name,
-                share: None,
-                weak: false,
-            },
-        )?;
+        let guest_connection_ids;
+        {
+            let mut state = self.state_mut();
+            guest_connection_ids = state
+                .read_project(request.payload.project_id, request.sender_id)?
+                .guest_connection_ids();
+            state.register_worktree(
+                request.payload.project_id,
+                request.payload.worktree_id,
+                request.sender_id,
+                Worktree {
+                    authorized_user_ids: contact_user_ids.clone(),
+                    root_name: request.payload.root_name.clone(),
+                    weak: request.payload.weak,
+                },
+            )?;
+        }
+        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
+            self.peer
+                .forward_send(request.sender_id, connection_id, request.payload.clone())
+        })?;
         self.update_contacts_for_users(&contact_user_ids)?;
         Ok(proto::Ack {})
     }
@@ -463,48 +471,6 @@ impl Server {
         Ok(())
     }
 
-    async fn share_worktree(
-        mut self: Arc<Server>,
-        mut request: TypedEnvelope<proto::ShareWorktree>,
-    ) -> tide::Result<proto::Ack> {
-        let worktree = request
-            .payload
-            .worktree
-            .as_mut()
-            .ok_or_else(|| anyhow!("missing worktree"))?;
-        let entries = worktree
-            .entries
-            .iter()
-            .map(|entry| (entry.id, entry.clone()))
-            .collect();
-        let diagnostic_summaries = worktree
-            .diagnostic_summaries
-            .iter()
-            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
-            .collect();
-
-        let shared_worktree = self.state_mut().share_worktree(
-            request.payload.project_id,
-            worktree.id,
-            request.sender_id,
-            entries,
-            diagnostic_summaries,
-            worktree.next_update_id,
-        )?;
-
-        broadcast(
-            request.sender_id,
-            shared_worktree.connection_ids,
-            |connection_id| {
-                self.peer
-                    .forward_send(request.sender_id, connection_id, request.payload.clone())
-            },
-        )?;
-        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
-
-        Ok(proto::Ack {})
-    }
-
     async fn update_worktree(
         mut self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateWorktree>,
@@ -513,7 +479,6 @@ impl Server {
             request.sender_id,
             request.payload.project_id,
             request.payload.worktree_id,
-            request.payload.id,
             &request.payload.removed_entries,
             &request.payload.updated_entries,
         )?;
@@ -1198,7 +1163,7 @@ mod tests {
         cell::Cell,
         env,
         ops::Deref,
-        path::Path,
+        path::{Path, PathBuf},
         rc::Rc,
         sync::{
             atomic::{AtomicBool, Ordering::SeqCst},
@@ -1218,7 +1183,7 @@ mod tests {
         fs::{FakeFs, Fs as _},
         language::{
             tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
-            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
+            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
         },
         lsp,
         project::{DiagnosticSummary, Project, ProjectPath},
@@ -2149,16 +2114,14 @@ mod tests {
                 let worktree = store
                     .project(project_id)
                     .unwrap()
+                    .share
+                    .as_ref()
+                    .unwrap()
                     .worktrees
                     .get(&worktree_id.to_proto())
                     .unwrap();
 
-                !worktree
-                    .share
-                    .as_ref()
-                    .unwrap()
-                    .diagnostic_summaries
-                    .is_empty()
+                !worktree.diagnostic_summaries.is_empty()
             })
             .await;
 
@@ -2389,7 +2352,7 @@ mod tests {
         // Return some completions from the host's language server.
         cx_a.foreground().start_waiting();
         fake_language_server
-            .handle_request::<lsp::request::Completion, _>(|params| {
+            .handle_request::<lsp::request::Completion, _>(|params, _| {
                 assert_eq!(
                     params.text_document_position.text_document.uri,
                     lsp::Url::from_file_path("/a/main.rs").unwrap(),
@@ -2455,23 +2418,28 @@ mod tests {
 
         // Return a resolved completion from the host's language server.
         // The resolved completion has an additional text edit.
-        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
-            assert_eq!(params.label, "first_method(…)");
-            lsp::CompletionItem {
-                label: "first_method(…)".into(),
-                detail: Some("fn(&mut self, B) -> C".into()),
-                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
-                    new_text: "first_method($1)".to_string(),
-                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
-                })),
-                additional_text_edits: Some(vec![lsp::TextEdit {
-                    new_text: "use d::SomeTrait;\n".to_string(),
-                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
-                }]),
-                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
-                ..Default::default()
-            }
-        });
+        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
+            |params, _| {
+                assert_eq!(params.label, "first_method(…)");
+                lsp::CompletionItem {
+                    label: "first_method(…)".into(),
+                    detail: Some("fn(&mut self, B) -> C".into()),
+                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
+                        new_text: "first_method($1)".to_string(),
+                        range: lsp::Range::new(
+                            lsp::Position::new(0, 14),
+                            lsp::Position::new(0, 14),
+                        ),
+                    })),
+                    additional_text_edits: Some(vec![lsp::TextEdit {
+                        new_text: "use d::SomeTrait;\n".to_string(),
+                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
+                    }]),
+                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
+                    ..Default::default()
+                }
+            },
+        );
 
         // The additional edit is applied.
         buffer_a
@@ -2568,7 +2536,7 @@ mod tests {
         });
 
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
-        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
+        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
             Some(vec![
                 lsp::TextEdit {
                     range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
@@ -2677,7 +2645,7 @@ mod tests {
         let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
 
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
-        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
+        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
             Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
                 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
                 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
@@ -2702,7 +2670,7 @@ mod tests {
         // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
         // the previous call to `definition`.
         let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
-        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
+        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
             Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
                 lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
                 lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
@@ -2826,7 +2794,7 @@ mod tests {
         let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
 
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
-        fake_language_server.handle_request::<lsp::request::References, _>(|params| {
+        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
             assert_eq!(
                 params.text_document_position.text_document.uri.as_str(),
                 "file:///root-1/one.rs"
@@ -2954,7 +2922,7 @@ mod tests {
 
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
-            |params| {
+            |params, _| {
                 assert_eq!(
                     params
                         .text_document_position_params
@@ -3103,7 +3071,7 @@ mod tests {
         // Request the definition of a symbol as the guest.
         let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
-        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_| {
+        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
             #[allow(deprecated)]
             Some(vec![lsp::SymbolInformation {
                 name: "TWO".into(),
@@ -3245,7 +3213,7 @@ mod tests {
         }
 
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
-        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
+        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
             Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
                 lsp::Url::from_file_path("/root/b.rs").unwrap(),
                 lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
@@ -3353,7 +3321,7 @@ mod tests {
 
         let mut fake_language_server = fake_language_servers.next().await.unwrap();
         fake_language_server
-            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
+            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
                 assert_eq!(
                     params.text_document.uri,
                     lsp::Url::from_file_path("/a/main.rs").unwrap(),
@@ -3372,7 +3340,7 @@ mod tests {
         });
 
         fake_language_server
-            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
+            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
                 assert_eq!(
                     params.text_document.uri,
                     lsp::Url::from_file_path("/a/main.rs").unwrap(),
@@ -3443,7 +3411,7 @@ mod tests {
                 Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
             })
             .unwrap();
-        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
+        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
             lsp::CodeAction {
                 title: "Inline into all callers".to_string(),
                 edit: Some(lsp::WorkspaceEdit {
@@ -3598,7 +3566,7 @@ mod tests {
         });
 
         fake_language_server
-            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
+            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
                 assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
                 assert_eq!(params.position, lsp::Position::new(0, 7));
                 Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
@@ -3628,7 +3596,7 @@ mod tests {
             Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
         });
         fake_language_server
-            .handle_request::<lsp::request::Rename, _>(|params| {
+            .handle_request::<lsp::request::Rename, _>(|params, _| {
                 assert_eq!(
                     params.text_document_position.text_document.uri.as_str(),
                     "file:///dir/one.rs"
@@ -4412,12 +4380,6 @@ mod tests {
                             .worktrees(cx)
                             .map(|worktree| {
                                 let worktree = worktree.read(cx);
-                                assert!(
-                                    !worktree.as_remote().unwrap().has_pending_updates(),
-                                    "Guest {} worktree {:?} contains deferred updates",
-                                    guest_id,
-                                    worktree.id()
-                                );
                                 (worktree.id(), worktree.snapshot())
                             })
                             .collect::<BTreeMap<_, _>>()
@@ -4472,9 +4434,11 @@ mod tests {
                 assert_eq!(
                     guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
                     host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
-                    "guest {} buffer {} differs from the host's buffer",
+                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
                     guest_id,
                     buffer_id,
+                    host_buffer
+                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
                 );
             }
         }
@@ -4683,8 +4647,9 @@ mod tests {
             language_server_config.set_fake_initializer({
                 let rng = rng.clone();
                 let files = files.clone();
+                let project = project.clone();
                 move |fake_server| {
-                    fake_server.handle_request::<lsp::request::Completion, _>(|_| {
+                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
                         Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
                             text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
                                 range: lsp::Range::new(
@@ -4697,7 +4662,7 @@ mod tests {
                         }]))
                     });
 
-                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
+                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
                         Some(vec![lsp::CodeActionOrCommand::CodeAction(
                             lsp::CodeAction {
                                 title: "the-code-action".to_string(),
@@ -4706,33 +4671,75 @@ mod tests {
                         )])
                     });
 
-                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
-                        Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
-                            params.position,
-                            params.position,
-                        )))
-                    });
+                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
+                        |params, _| {
+                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
+                                params.position,
+                                params.position,
+                            )))
+                        },
+                    );
 
                     fake_server.handle_request::<lsp::request::GotoDefinition, _>({
                         let files = files.clone();
                         let rng = rng.clone();
-                        move |_| {
+                        move |_, _| {
                             let files = files.lock();
                             let mut rng = rng.lock();
                             let count = rng.gen_range::<usize, _>(1..3);
+                            let files = (0..count)
+                                .map(|_| files.choose(&mut *rng).unwrap())
+                                .collect::<Vec<_>>();
+                            log::info!("LSP: Returning definitions in files {:?}", &files);
                             Some(lsp::GotoDefinitionResponse::Array(
-                                (0..count)
-                                    .map(|_| {
-                                        let file = files.choose(&mut *rng).unwrap().as_path();
-                                        lsp::Location {
-                                            uri: lsp::Url::from_file_path(file).unwrap(),
-                                            range: Default::default(),
-                                        }
+                                files
+                                    .into_iter()
+                                    .map(|file| lsp::Location {
+                                        uri: lsp::Url::from_file_path(file).unwrap(),
+                                        range: Default::default(),
                                     })
                                     .collect(),
                             ))
                         }
                     });
+
+                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
+                        let rng = rng.clone();
+                        let project = project.clone();
+                        move |params, mut cx| {
+                            project.update(&mut cx, |project, cx| {
+                                let path = params
+                                    .text_document_position_params
+                                    .text_document
+                                    .uri
+                                    .to_file_path()
+                                    .unwrap();
+                                let (worktree, relative_path) =
+                                    project.find_local_worktree(&path, cx)?;
+                                let project_path =
+                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
+                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
+
+                                let mut highlights = Vec::new();
+                                let highlight_count = rng.lock().gen_range(1..=5);
+                                let mut prev_end = 0;
+                                for _ in 0..highlight_count {
+                                    let range =
+                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
+                                    let start =
+                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
+                                    let end =
+                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
+                                    highlights.push(lsp::DocumentHighlight {
+                                        range: lsp::Range::new(start, end),
+                                        kind: Some(lsp::DocumentHighlightKind::READ),
+                                    });
+                                    prev_end = range.end;
+                                }
+                                Some(highlights)
+                            })
+                        }
+                    });
                 }
             });
 
@@ -4778,13 +4785,17 @@ mod tests {
                                 let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
                                 let (worktree, path) = project
                                     .update(&mut cx, |project, cx| {
-                                        project.find_or_create_local_worktree(file, false, cx)
+                                        project.find_or_create_local_worktree(
+                                            file.clone(),
+                                            false,
+                                            cx,
+                                        )
                                     })
                                     .await
                                     .unwrap();
                                 let project_path =
                                     worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
-                                log::info!("Host: opening path {:?}", project_path);
+                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
                                 let buffer = project
                                     .update(&mut cx, |project, cx| {
                                         project.open_buffer(project_path, cx)
@@ -4847,6 +4858,8 @@ mod tests {
                     cx.background().simulate_random_delay().await;
                 }
 
+                log::info!("Host done");
+
                 self.project = Some(project);
                 (self, cx)
             }
@@ -4867,7 +4880,8 @@ mod tests {
                         project
                             .worktrees(&cx)
                             .filter(|worktree| {
-                                worktree.read(cx).entries(false).any(|e| e.is_file())
+                                let worktree = worktree.read(cx);
+                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
                             })
                             .choose(&mut *rng.lock())
                     }) {
@@ -4878,15 +4892,25 @@ mod tests {
                     };
 
                     operations.set(operations.get() + 1);
-                    let project_path = worktree.read_with(&cx, |worktree, _| {
-                        let entry = worktree
-                            .entries(false)
-                            .filter(|e| e.is_file())
-                            .choose(&mut *rng.lock())
-                            .unwrap();
-                        (worktree.id(), entry.path.clone())
-                    });
-                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
+                    let (worktree_root_name, project_path) =
+                        worktree.read_with(&cx, |worktree, _| {
+                            let entry = worktree
+                                .entries(false)
+                                .filter(|e| e.is_file())
+                                .choose(&mut *rng.lock())
+                                .unwrap();
+                            (
+                                worktree.root_name().to_string(),
+                                (worktree.id(), entry.path.clone()),
+                            )
+                        });
+                    log::info!(
+                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
+                        guest_id,
+                        project_path.0,
+                        worktree_root_name,
+                        project_path.1
+                    );
                     let buffer = project
                         .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
                         .await
@@ -5010,13 +5034,34 @@ mod tests {
                             project.definition(&buffer, offset, cx)
                         });
                         let definitions = cx.background().spawn(async move {
-                            definitions.await.expect("definitions request failed");
+                            definitions.await.expect("definitions request failed")
                         });
                         if rng.lock().gen_bool(0.3) {
                             log::info!("Guest {}: detaching definitions request", guest_id);
                             definitions.detach();
                         } else {
-                            definitions.await;
+                            self.buffers
+                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
+                        }
+                    }
+                    50..=55 => {
+                        let highlights = project.update(&mut cx, |project, cx| {
+                            log::info!(
+                                "Guest {}: requesting highlights for buffer {:?}",
+                                guest_id,
+                                buffer.read(cx).file().unwrap().full_path(cx)
+                            );
+                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
+                            project.document_highlights(&buffer, offset, cx)
+                        });
+                        let highlights = cx.background().spawn(async move {
+                            highlights.await.expect("highlights request failed");
+                        });
+                        if rng.lock().gen_bool(0.3) {
+                            log::info!("Guest {}: detaching highlights request", guest_id);
+                            highlights.detach();
+                        } else {
+                            highlights.await;
                         }
                     }
                     _ => {
@@ -5033,6 +5078,8 @@ mod tests {
                 cx.background().simulate_random_delay().await;
             }
 
+            log::info!("Guest {} done", guest_id);
+
             self.project = Some(project);
             (self, cx)
         }

crates/server/src/rpc/store.rs 🔗

@@ -30,7 +30,6 @@ pub struct Project {
 pub struct Worktree {
     pub authorized_user_ids: Vec<UserId>,
     pub root_name: String,
-    pub share: Option<WorktreeShare>,
     pub weak: bool,
 }
 
@@ -38,12 +37,13 @@ pub struct Worktree {
 pub struct ProjectShare {
     pub guests: HashMap<ConnectionId, (ReplicaId, UserId)>,
     pub active_replica_ids: HashSet<ReplicaId>,
+    pub worktrees: HashMap<u64, WorktreeShare>,
 }
 
+#[derive(Default)]
 pub struct WorktreeShare {
     pub entries: HashMap<u64, proto::Entry>,
     pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
-    pub next_update_id: u64,
 }
 
 #[derive(Default)]
@@ -75,11 +75,6 @@ pub struct LeftProject {
     pub authorized_user_ids: Vec<UserId>,
 }
 
-pub struct SharedWorktree {
-    pub authorized_user_ids: Vec<UserId>,
-    pub connection_ids: Vec<ConnectionId>,
-}
-
 impl Store {
     pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) {
         self.connections.insert(
@@ -273,6 +268,9 @@ impl Store {
                 connection.projects.insert(project_id);
             }
             project.worktrees.insert(worktree_id, worktree);
+            if let Ok(share) = project.share_mut() {
+                share.worktrees.insert(worktree_id, Default::default());
+            }
 
             #[cfg(test)]
             self.check_invariants();
@@ -327,8 +325,9 @@ impl Store {
             .ok_or_else(|| anyhow!("no such worktree"))?;
 
         let mut guest_connection_ids = Vec::new();
-        if let Some(share) = &project.share {
+        if let Ok(share) = project.share_mut() {
             guest_connection_ids.extend(share.guests.keys());
+            share.worktrees.remove(&worktree_id);
         }
 
         for authorized_user_id in &worktree.authorized_user_ids {
@@ -350,7 +349,11 @@ impl Store {
     pub fn share_project(&mut self, project_id: u64, connection_id: ConnectionId) -> bool {
         if let Some(project) = self.projects.get_mut(&project_id) {
             if project.host_connection_id == connection_id {
-                project.share = Some(ProjectShare::default());
+                let mut share = ProjectShare::default();
+                for worktree_id in project.worktrees.keys() {
+                    share.worktrees.insert(*worktree_id, Default::default());
+                }
+                project.share = Some(share);
                 return true;
             }
         }
@@ -381,10 +384,6 @@ impl Store {
                 }
             }
 
-            for worktree in project.worktrees.values_mut() {
-                worktree.share.take();
-            }
-
             #[cfg(test)]
             self.check_invariants();
 
@@ -397,38 +396,6 @@ impl Store {
         }
     }
 
-    pub fn share_worktree(
-        &mut self,
-        project_id: u64,
-        worktree_id: u64,
-        connection_id: ConnectionId,
-        entries: HashMap<u64, proto::Entry>,
-        diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
-        next_update_id: u64,
-    ) -> tide::Result<SharedWorktree> {
-        let project = self
-            .projects
-            .get_mut(&project_id)
-            .ok_or_else(|| anyhow!("no such project"))?;
-        let worktree = project
-            .worktrees
-            .get_mut(&worktree_id)
-            .ok_or_else(|| anyhow!("no such worktree"))?;
-        if project.host_connection_id == connection_id && project.share.is_some() {
-            worktree.share = Some(WorktreeShare {
-                entries,
-                diagnostic_summaries,
-                next_update_id,
-            });
-            Ok(SharedWorktree {
-                authorized_user_ids: project.authorized_user_ids(),
-                connection_ids: project.guest_connection_ids(),
-            })
-        } else {
-            Err(anyhow!("no such worktree"))?
-        }
-    }
-
     pub fn update_diagnostic_summary(
         &mut self,
         project_id: u64,
@@ -440,17 +407,16 @@ impl Store {
             .projects
             .get_mut(&project_id)
             .ok_or_else(|| anyhow!("no such project"))?;
-        let worktree = project
-            .worktrees
-            .get_mut(&worktree_id)
-            .ok_or_else(|| anyhow!("no such worktree"))?;
         if project.host_connection_id == connection_id {
-            if let Some(share) = worktree.share.as_mut() {
-                share
-                    .diagnostic_summaries
-                    .insert(summary.path.clone().into(), summary);
-                return Ok(project.connection_ids());
-            }
+            let worktree = project
+                .share_mut()?
+                .worktrees
+                .get_mut(&worktree_id)
+                .ok_or_else(|| anyhow!("no such worktree"))?;
+            worktree
+                .diagnostic_summaries
+                .insert(summary.path.clone().into(), summary);
+            return Ok(project.connection_ids());
         }
 
         Err(anyhow!("no such worktree"))?
@@ -537,28 +503,20 @@ impl Store {
         connection_id: ConnectionId,
         project_id: u64,
         worktree_id: u64,
-        update_id: u64,
         removed_entries: &[u64],
         updated_entries: &[proto::Entry],
     ) -> tide::Result<Vec<ConnectionId>> {
         let project = self.write_project(project_id, connection_id)?;
-        let share = project
+        let worktree = project
+            .share_mut()?
             .worktrees
             .get_mut(&worktree_id)
-            .ok_or_else(|| anyhow!("no such worktree"))?
-            .share
-            .as_mut()
-            .ok_or_else(|| anyhow!("worktree is not shared"))?;
-        if share.next_update_id != update_id {
-            return Err(anyhow!("received worktree updates out-of-order"))?;
-        }
-
-        share.next_update_id = update_id + 1;
+            .ok_or_else(|| anyhow!("no such worktree"))?;
         for entry_id in removed_entries {
-            share.entries.remove(&entry_id);
+            worktree.entries.remove(&entry_id);
         }
         for entry in updated_entries {
-            share.entries.insert(entry.id, entry.clone());
+            worktree.entries.insert(entry.id, entry.clone());
         }
         Ok(project.connection_ids())
     }