Handle buffer deduping in the worktree instead of in workspace

Max Brunsfeld created

Previously, buffers were only deduped by file if they were opened
through Workspace::open_entry

Change summary

Cargo.lock                        |   1 
crates/project/Cargo.toml         |   2 
crates/project/src/worktree.rs    | 434 +++++++++++++++++++++-----------
crates/server/src/rpc.rs          |   2 
crates/workspace/src/workspace.rs |  53 ---
5 files changed, 294 insertions(+), 198 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3456,6 +3456,7 @@ dependencies = [
  "async-trait",
  "client",
  "clock",
+ "collections",
  "fsevent",
  "futures",
  "fuzzy",

crates/project/Cargo.toml 🔗

@@ -13,6 +13,7 @@ test-support = ["language/test-support", "text/test-support"]
 text = { path = "../text" }
 client = { path = "../client" }
 clock = { path = "../clock" }
+collections = { path = "../collections" }
 fsevent = { path = "../fsevent" }
 fuzzy = { path = "../fuzzy" }
 gpui = { path = "../gpui" }
@@ -37,6 +38,7 @@ toml = "0.5"
 
 [dev-dependencies]
 client = { path = "../client", features = ["test-support"] }
+collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui", features = ["test-support"] }
 language = { path = "../language", features = ["test-support"] }
 lsp = { path = "../lsp", features = ["test-support"] }

crates/project/src/worktree.rs 🔗

@@ -7,6 +7,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Context, Result};
 use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore};
 use clock::ReplicaId;
+use collections::{hash_map, HashMap};
 use futures::{Stream, StreamExt};
 use fuzzy::CharBag;
 use gpui::{
@@ -14,8 +15,8 @@ use gpui::{
     Task, UpgradeModelHandle, WeakModelHandle,
 };
 use language::{
-    Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, Language, LanguageRegistry, Operation,
-    PointUtf16, Rope,
+    Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, File as _, Language, LanguageRegistry,
+    Operation, PointUtf16, Rope,
 };
 use lazy_static::lazy_static;
 use lsp::LanguageServer;
@@ -29,7 +30,6 @@ use smol::channel::{self, Sender};
 use std::{
     any::Any,
     cmp::{self, Ordering},
-    collections::HashMap,
     convert::{TryFrom, TryInto},
     ffi::{OsStr, OsString},
     fmt,
@@ -240,7 +240,7 @@ impl Worktree {
         user_store
             .update(cx, |user_store, cx| user_store.load_users(user_ids, cx))
             .await?;
-        let mut collaborators = HashMap::with_capacity(join_response.collaborators.len());
+        let mut collaborators = HashMap::default();
         for message in join_response.collaborators {
             let collaborator = Collaborator::from_proto(message, &user_store, cx).await?;
             collaborators.insert(collaborator.peer_id, collaborator);
@@ -306,8 +306,9 @@ impl Worktree {
                     snapshot_rx,
                     updates_tx,
                     client: client.clone(),
+                    loading_buffers: Default::default(),
                     open_buffers: Default::default(),
-                    diagnostics: Vec::new(),
+                    diagnostic_summaries: HashMap::default(),
                     collaborators,
                     queued_operations: Default::default(),
                     languages,
@@ -476,15 +477,65 @@ impl Worktree {
         std::iter::empty()
     }
 
+    pub fn loading_buffers<'a>(&'a mut self) -> &'a mut LoadingBuffers {
+        match self {
+            Worktree::Local(worktree) => &mut worktree.loading_buffers,
+            Worktree::Remote(worktree) => &mut worktree.loading_buffers,
+        }
+    }
+
     pub fn open_buffer(
         &mut self,
         path: impl AsRef<Path>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ModelHandle<Buffer>>> {
-        match self {
-            Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx),
-            Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx),
+        let path = path.as_ref();
+
+        // If there is already a buffer for the given path, then return it.
+        let existing_buffer = match self {
+            Worktree::Local(worktree) => worktree.get_open_buffer(path, cx),
+            Worktree::Remote(worktree) => worktree.get_open_buffer(path, cx),
+        };
+        if let Some(existing_buffer) = existing_buffer {
+            return cx.spawn(move |_, _| async move { Ok(existing_buffer) });
         }
+
+        let path: Arc<Path> = Arc::from(path);
+        let mut loading_watch = match self.loading_buffers().entry(path.clone()) {
+            // If the given path is already being loaded, then wait for that existing
+            // task to complete and return the same buffer.
+            hash_map::Entry::Occupied(e) => e.get().clone(),
+
+            // Otherwise, record the fact that this path is now being loaded.
+            hash_map::Entry::Vacant(entry) => {
+                let (mut tx, rx) = postage::watch::channel();
+                entry.insert(rx.clone());
+
+                let load_buffer = match self {
+                    Worktree::Local(worktree) => worktree.open_buffer(&path, cx),
+                    Worktree::Remote(worktree) => worktree.open_buffer(&path, cx),
+                };
+                cx.spawn(move |this, mut cx| async move {
+                    let result = load_buffer.await;
+
+                    // After the buffer loads, record the fact that it is no longer
+                    // loading.
+                    this.update(&mut cx, |this, _| this.loading_buffers().remove(&path));
+                    *tx.borrow_mut() = Some(result.map_err(|e| Arc::new(e)));
+                })
+                .detach();
+                rx
+            }
+        };
+
+        cx.spawn(|_, _| async move {
+            loop {
+                if let Some(result) = loading_watch.borrow().as_ref() {
+                    return result.clone().map_err(|e| anyhow!("{}", e));
+                }
+                loading_watch.recv().await;
+            }
+        })
     }
 
     #[cfg(feature = "test-support")]
@@ -769,8 +820,8 @@ impl Worktree {
                 .context("path is not within worktree")?,
         );
 
-        let mut group_ids_by_diagnostic_range = HashMap::new();
-        let mut diagnostics_by_group_id = HashMap::new();
+        let mut group_ids_by_diagnostic_range = HashMap::default();
+        let mut diagnostics_by_group_id = HashMap::default();
         let mut next_group_id = 0;
         for diagnostic in &params.diagnostics {
             let source = diagnostic.source.as_ref();
@@ -878,15 +929,18 @@ impl Worktree {
     }
 }
 
-impl Deref for Worktree {
-    type Target = Snapshot;
-
-    fn deref(&self) -> &Self::Target {
-        match self {
-            Worktree::Local(worktree) => &worktree.snapshot,
-            Worktree::Remote(worktree) => &worktree.snapshot,
-        }
-    }
+#[derive(Clone)]
+pub struct Snapshot {
+    id: usize,
+    scan_id: usize,
+    abs_path: Arc<Path>,
+    root_name: String,
+    root_char_bag: CharBag,
+    ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
+    entries_by_path: SumTree<Entry>,
+    entries_by_id: SumTree<PathEntry>,
+    removed_entry_ids: HashMap<u64, usize>,
+    next_entry_id: Arc<AtomicUsize>,
 }
 
 pub struct LocalWorktree {
@@ -899,9 +953,11 @@ pub struct LocalWorktree {
     poll_task: Option<Task<()>>,
     remote_id: watch::Receiver<Option<u64>>,
     share: Option<ShareState>,
+    loading_buffers: LoadingBuffers,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
     shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
     diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<PointUtf16>>>,
+    diagnostic_summaries: HashMap<Arc<Path>, DiagnosticSummary>,
     collaborators: HashMap<PeerId, Collaborator>,
     queued_operations: Vec<(u64, Operation)>,
     languages: Arc<LanguageRegistry>,
@@ -911,6 +967,33 @@ pub struct LocalWorktree {
     language_servers: HashMap<String, Arc<LanguageServer>>,
 }
 
+struct ShareState {
+    snapshots_tx: Sender<Snapshot>,
+    _subscriptions: Vec<client::Subscription>,
+}
+
+pub struct RemoteWorktree {
+    remote_id: u64,
+    snapshot: Snapshot,
+    snapshot_rx: watch::Receiver<Snapshot>,
+    client: Arc<Client>,
+    updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
+    replica_id: ReplicaId,
+    loading_buffers: LoadingBuffers,
+    open_buffers: HashMap<usize, RemoteBuffer>,
+    collaborators: HashMap<PeerId, Collaborator>,
+    diagnostic_summaries: HashMap<Arc<Path>, DiagnosticSummary>,
+    languages: Arc<LanguageRegistry>,
+    user_store: ModelHandle<UserStore>,
+    queued_operations: Vec<(u64, Operation)>,
+    _subscriptions: Vec<client::Subscription>,
+}
+
+type LoadingBuffers = HashMap<
+    Arc<Path>,
+    postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
+>;
+
 #[derive(Default, Deserialize)]
 struct WorktreeConfig {
     collaborators: Vec<String>,
@@ -1015,9 +1098,11 @@ impl LocalWorktree {
                 _maintain_remote_id_task,
                 share: None,
                 poll_task: None,
+                loading_buffers: Default::default(),
                 open_buffers: Default::default(),
                 shared_buffers: Default::default(),
                 diagnostics: Default::default(),
+                diagnostic_summaries: Default::default(),
                 queued_operations: Default::default(),
                 collaborators: Default::default(),
                 languages,
@@ -1105,20 +1190,18 @@ impl LocalWorktree {
         }
     }
 
-    pub fn open_buffer(
+    fn get_open_buffer(
         &mut self,
         path: &Path,
         cx: &mut ModelContext<Worktree>,
-    ) -> Task<Result<ModelHandle<Buffer>>> {
+    ) -> Option<ModelHandle<Buffer>> {
         let handle = cx.handle();
-
-        // If there is already a buffer for the given path, then return it.
-        let mut existing_buffer = None;
+        let mut result = None;
         self.open_buffers.retain(|_buffer_id, buffer| {
             if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
                 if let Some(file) = buffer.read(cx.as_ref()).file() {
                     if file.worktree_id() == handle.id() && file.path().as_ref() == path {
-                        existing_buffer = Some(buffer);
+                        result = Some(buffer);
                     }
                 }
                 true
@@ -1126,45 +1209,45 @@ impl LocalWorktree {
                 false
             }
         });
+        result
+    }
 
+    fn open_buffer(
+        &mut self,
+        path: &Path,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Task<Result<ModelHandle<Buffer>>> {
         let path = Arc::from(path);
-        cx.spawn(|this, mut cx| async move {
-            if let Some(existing_buffer) = existing_buffer {
-                Ok(existing_buffer)
-            } else {
-                let (file, contents) = this
-                    .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx))
-                    .await?;
-                let language = this.read_with(&cx, |this, _| {
-                    use language::File;
-                    this.languages().select_language(file.full_path()).cloned()
-                });
-                let (diagnostics, language_server) = this.update(&mut cx, |this, cx| {
-                    let this = this.as_local_mut().unwrap();
-                    (
-                        this.diagnostics.remove(path.as_ref()),
-                        language
-                            .as_ref()
-                            .and_then(|language| this.ensure_language_server(language, cx)),
-                    )
-                });
-                let buffer = cx.add_model(|cx| {
-                    let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
-                    buffer.set_language(language, language_server, cx);
-                    if let Some(diagnostics) = diagnostics {
-                        buffer.update_diagnostics(None, diagnostics, cx).unwrap();
-                    }
-                    buffer
-                });
-                this.update(&mut cx, |this, _| {
-                    let this = this
-                        .as_local_mut()
-                        .ok_or_else(|| anyhow!("must be a local worktree"))?;
+        cx.spawn(move |this, mut cx| async move {
+            let (file, contents) = this
+                .update(&mut cx, |t, cx| t.as_local().unwrap().load(&path, cx))
+                .await?;
 
-                    this.open_buffers.insert(buffer.id(), buffer.downgrade());
-                    Ok(buffer)
-                })
-            }
+            let (diagnostics, language, language_server) = this.update(&mut cx, |this, cx| {
+                let this = this.as_local_mut().unwrap();
+                let diagnostics = this.diagnostics.remove(&path);
+                let language = this.languages.select_language(file.full_path()).cloned();
+                let server = language
+                    .as_ref()
+                    .and_then(|language| this.ensure_language_server(language, cx));
+                (diagnostics, language, server)
+            });
+
+            let buffer = cx.add_model(|cx| {
+                let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
+                buffer.set_language(language, language_server, cx);
+                if let Some(diagnostics) = diagnostics {
+                    buffer.update_diagnostics(None, diagnostics, cx).unwrap();
+                }
+                buffer
+            });
+
+            this.update(&mut cx, |this, _| {
+                let this = this.as_local_mut().unwrap();
+                this.open_buffers.insert(buffer.id(), buffer.downgrade());
+            });
+
+            Ok(buffer)
         })
     }
 
@@ -1173,13 +1256,12 @@ impl LocalWorktree {
         envelope: TypedEnvelope<proto::OpenBuffer>,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<Result<proto::OpenBufferResponse>> {
-        let peer_id = envelope.original_sender_id();
-        let path = Path::new(&envelope.payload.path);
-
-        let buffer = self.open_buffer(path, cx);
-
         cx.spawn(|this, mut cx| async move {
-            let buffer = buffer.await?;
+            let peer_id = envelope.original_sender_id();
+            let path = Path::new(&envelope.payload.path);
+            let buffer = this
+                .update(&mut cx, |this, cx| this.open_buffer(path, cx))
+                .await?;
             this.update(&mut cx, |this, cx| {
                 this.as_local_mut()
                     .unwrap()
@@ -1473,6 +1555,17 @@ fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result<Gitignore> {
     Ok(builder.build()?)
 }
 
+impl Deref for Worktree {
+    type Target = Snapshot;
+
+    fn deref(&self) -> &Self::Target {
+        match self {
+            Worktree::Local(worktree) => &worktree.snapshot,
+            Worktree::Remote(worktree) => &worktree.snapshot,
+        }
+    }
+}
+
 impl Deref for LocalWorktree {
     type Target = Snapshot;
 
@@ -1487,38 +1580,18 @@ impl fmt::Debug for LocalWorktree {
     }
 }
 
-struct ShareState {
-    snapshots_tx: Sender<Snapshot>,
-    _subscriptions: Vec<client::Subscription>,
-}
-
-pub struct RemoteWorktree {
-    remote_id: u64,
-    snapshot: Snapshot,
-    snapshot_rx: watch::Receiver<Snapshot>,
-    client: Arc<Client>,
-    updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
-    replica_id: ReplicaId,
-    open_buffers: HashMap<usize, RemoteBuffer>,
-    collaborators: HashMap<PeerId, Collaborator>,
-    diagnostics: Vec<DiagnosticSummary>,
-    languages: Arc<LanguageRegistry>,
-    user_store: ModelHandle<UserStore>,
-    queued_operations: Vec<(u64, Operation)>,
-    _subscriptions: Vec<client::Subscription>,
-}
-
 impl RemoteWorktree {
-    pub fn open_buffer(
+    fn get_open_buffer(
         &mut self,
         path: &Path,
         cx: &mut ModelContext<Worktree>,
-    ) -> Task<Result<ModelHandle<Buffer>>> {
+    ) -> Option<ModelHandle<Buffer>> {
+        let handle = cx.handle();
         let mut existing_buffer = None;
         self.open_buffers.retain(|_buffer_id, buffer| {
             if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
                 if let Some(file) = buffer.read(cx.as_ref()).file() {
-                    if file.worktree_id() == cx.model_id() && file.path().as_ref() == path {
+                    if file.worktree_id() == handle.id() && file.path().as_ref() == path {
                         existing_buffer = Some(buffer);
                     }
                 }
@@ -1527,62 +1600,65 @@ impl RemoteWorktree {
                 false
             }
         });
+        existing_buffer
+    }
 
+    fn open_buffer(
+        &mut self,
+        path: &Path,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Task<Result<ModelHandle<Buffer>>> {
         let rpc = self.client.clone();
         let replica_id = self.replica_id;
         let remote_worktree_id = self.remote_id;
         let root_path = self.snapshot.abs_path.clone();
-        let path = path.to_string_lossy().to_string();
-        cx.spawn_weak(|this, mut cx| async move {
-            if let Some(existing_buffer) = existing_buffer {
-                Ok(existing_buffer)
-            } else {
-                let entry = this
-                    .upgrade(&cx)
-                    .ok_or_else(|| anyhow!("worktree was closed"))?
-                    .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
-                    .ok_or_else(|| anyhow!("file does not exist"))?;
-                let response = rpc
-                    .request(proto::OpenBuffer {
-                        worktree_id: remote_worktree_id as u64,
-                        path,
-                    })
-                    .await?;
-
-                let this = this
-                    .upgrade(&cx)
-                    .ok_or_else(|| anyhow!("worktree was closed"))?;
-                let file = File {
-                    entry_id: Some(entry.id),
-                    worktree: this.clone(),
-                    worktree_path: root_path,
-                    path: entry.path,
-                    mtime: entry.mtime,
-                    is_local: false,
-                };
-                let language = this.read_with(&cx, |this, _| {
-                    use language::File;
-                    this.languages().select_language(file.full_path()).cloned()
-                });
-                let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
-                let buffer_id = remote_buffer.id as usize;
-                let buffer = cx.add_model(|cx| {
-                    Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
-                        .unwrap()
-                        .with_language(language, None, cx)
-                });
-                this.update(&mut cx, |this, cx| {
-                    let this = this.as_remote_mut().unwrap();
-                    if let Some(RemoteBuffer::Operations(pending_ops)) = this
-                        .open_buffers
-                        .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
-                    {
-                        buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
-                    }
-                    Result::<_, anyhow::Error>::Ok(())
-                })?;
-                Ok(buffer)
-            }
+        let path: Arc<Path> = Arc::from(path);
+        let path_string = path.to_string_lossy().to_string();
+        cx.spawn_weak(move |this, mut cx| async move {
+            let entry = this
+                .upgrade(&cx)
+                .ok_or_else(|| anyhow!("worktree was closed"))?
+                .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned())
+                .ok_or_else(|| anyhow!("file does not exist"))?;
+            let response = rpc
+                .request(proto::OpenBuffer {
+                    worktree_id: remote_worktree_id as u64,
+                    path: path_string,
+                })
+                .await?;
+
+            let this = this
+                .upgrade(&cx)
+                .ok_or_else(|| anyhow!("worktree was closed"))?;
+            let file = File {
+                entry_id: Some(entry.id),
+                worktree: this.clone(),
+                worktree_path: root_path,
+                path: entry.path,
+                mtime: entry.mtime,
+                is_local: false,
+            };
+            let language = this.read_with(&cx, |this, _| {
+                use language::File;
+                this.languages().select_language(file.full_path()).cloned()
+            });
+            let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
+            let buffer_id = remote_buffer.id as usize;
+            let buffer = cx.add_model(|cx| {
+                Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx)
+                    .unwrap()
+                    .with_language(language, None, cx)
+            });
+            this.update(&mut cx, move |this, cx| {
+                let this = this.as_remote_mut().unwrap();
+                if let Some(RemoteBuffer::Operations(pending_ops)) = this
+                    .open_buffers
+                    .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
+                {
+                    buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
+                }
+                Result::<_, anyhow::Error>::Ok(buffer)
+            })
         })
     }
 
@@ -1665,20 +1741,6 @@ impl RemoteBuffer {
     }
 }
 
-#[derive(Clone)]
-pub struct Snapshot {
-    id: usize,
-    scan_id: usize,
-    abs_path: Arc<Path>,
-    root_name: String,
-    root_char_bag: CharBag,
-    ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
-    entries_by_path: SumTree<Entry>,
-    entries_by_id: SumTree<PathEntry>,
-    removed_entry_ids: HashMap<u64, usize>,
-    next_entry_id: Arc<AtomicUsize>,
-}
-
 impl Snapshot {
     pub fn id(&self) -> usize {
         self.id
@@ -3519,6 +3581,64 @@ mod tests {
         server.receive::<proto::CloseWorktree>().await.unwrap();
     }
 
+    #[gpui::test]
+    async fn test_buffer_deduping(mut cx: gpui::TestAppContext) {
+        let user_id = 100;
+        let mut client = Client::new();
+        let server = FakeServer::for_client(user_id, &mut client, &cx).await;
+        let user_store = server.build_user_store(client.clone(), &mut cx).await;
+
+        let fs = Arc::new(FakeFs::new());
+        fs.insert_tree(
+            "/the-dir",
+            json!({
+                "a.txt": "a-contents",
+                "b.txt": "b-contents",
+            }),
+        )
+        .await;
+
+        let worktree = Worktree::open_local(
+            client.clone(),
+            user_store,
+            "/the-dir".as_ref(),
+            fs,
+            Default::default(),
+            &mut cx.to_async(),
+        )
+        .await
+        .unwrap();
+
+        // Spawn multiple tasks to open paths, repeating some paths.
+        let (buffer_a_1, buffer_b, buffer_a_2) = worktree.update(&mut cx, |worktree, cx| {
+            (
+                worktree.open_buffer("a.txt", cx),
+                worktree.open_buffer("b.txt", cx),
+                worktree.open_buffer("a.txt", cx),
+            )
+        });
+
+        let buffer_a_1 = buffer_a_1.await.unwrap();
+        let buffer_a_2 = buffer_a_2.await.unwrap();
+        let buffer_b = buffer_b.await.unwrap();
+        assert_eq!(buffer_a_1.read_with(&cx, |b, _| b.text()), "a-contents");
+        assert_eq!(buffer_b.read_with(&cx, |b, _| b.text()), "b-contents");
+
+        // There is only one buffer per path.
+        let buffer_a_id = buffer_a_1.id();
+        assert_eq!(buffer_a_2.id(), buffer_a_id);
+
+        // Open the same path again while it is still open.
+        drop(buffer_a_1);
+        let buffer_a_3 = worktree
+            .update(&mut cx, |worktree, cx| worktree.open_buffer("a.txt", cx))
+            .await
+            .unwrap();
+
+        // There's still only one buffer per path.
+        assert_eq!(buffer_a_3.id(), buffer_a_id);
+    }
+
     #[gpui::test]
     async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
         use std::fs;
@@ -3985,7 +4105,7 @@ mod tests {
         worktree
             .update(&mut cx, |tree, cx| tree.update_diagnostics(message, cx))
             .unwrap();
-        let buffer = buffer.read_with(&cx, |buffer, cx| buffer.snapshot());
+        let buffer = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
 
         assert_eq!(
             buffer

crates/server/src/rpc.rs 🔗

@@ -1064,7 +1064,7 @@ mod tests {
 
         // TODO
         // // Remove the selection set as client B, see those selections disappear as client A.
-        // cx_b.update(move |_| drop(editor_b));
+        cx_b.update(move |_| drop(editor_b));
         // buffer_a
         //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
         //     .await;

crates/workspace/src/workspace.rs 🔗

@@ -4,7 +4,7 @@ pub mod settings;
 pub mod sidebar;
 mod status_bar;
 
-use anyhow::{anyhow, Result};
+use anyhow::Result;
 use client::{Authenticate, ChannelList, Client, User, UserStore};
 use gpui::{
     action,
@@ -28,7 +28,6 @@ use sidebar::{Side, Sidebar, SidebarItemId, ToggleSidebarItem, ToggleSidebarItem
 use status_bar::StatusBar;
 pub use status_bar::StatusItemView;
 use std::{
-    collections::{hash_map::Entry, HashMap},
     future::Future,
     path::{Path, PathBuf},
     sync::Arc,
@@ -342,10 +341,6 @@ pub struct Workspace {
     project: ModelHandle<Project>,
     entry_openers: Arc<[Box<dyn EntryOpener>]>,
     items: Vec<Box<dyn WeakItemHandle>>,
-    loading_items: HashMap<
-        ProjectPath,
-        postage::watch::Receiver<Option<Result<Box<dyn ItemHandle>, Arc<anyhow::Error>>>>,
-    >,
     _observe_current_user: Task<()>,
 }
 
@@ -408,7 +403,6 @@ impl Workspace {
             project,
             entry_openers: params.entry_openers.clone(),
             items: Default::default(),
-            loading_items: Default::default(),
             _observe_current_user,
         }
     }
@@ -606,43 +600,22 @@ impl Workspace {
             }
         };
 
-        if let Entry::Vacant(entry) = self.loading_items.entry(project_path.clone()) {
-            let (mut tx, rx) = postage::watch::channel();
-            entry.insert(rx);
-
-            let project_path = project_path.clone();
-            let entry_openers = self.entry_openers.clone();
-            cx.as_mut()
-                .spawn(|mut cx| async move {
-                    let item = worktree.update(&mut cx, move |worktree, cx| {
-                        for opener in entry_openers.iter() {
-                            if let Some(task) = opener.open(worktree, project_path.clone(), cx) {
-                                return task;
-                            }
-                        }
-
-                        cx.spawn(|_, _| async move {
-                            Err(anyhow!("no opener for path {:?} found", project_path))
-                        })
-                    });
-                    *tx.borrow_mut() = Some(item.await.map_err(Arc::new));
-                })
-                .detach();
-        }
+        let project_path = project_path.clone();
+        let entry_openers = self.entry_openers.clone();
+        let task = worktree.update(cx, |worktree, cx| {
+            for opener in entry_openers.iter() {
+                if let Some(task) = opener.open(worktree, project_path.clone(), cx) {
+                    return Some(task);
+                }
+            }
+            log::error!("no opener for path {:?} found", project_path);
+            None
+        })?;
 
         let pane = pane.downgrade();
-        let mut watch = self.loading_items.get(&project_path).unwrap().clone();
-
         Some(cx.spawn(|this, mut cx| async move {
-            let load_result = loop {
-                if let Some(load_result) = watch.borrow().as_ref() {
-                    break load_result.clone();
-                }
-                watch.recv().await;
-            };
-
+            let load_result = task.await;
             this.update(&mut cx, |this, cx| {
-                this.loading_items.remove(&project_path);
                 if let Some(pane) = pane.upgrade(&cx) {
                     match load_result {
                         Ok(item) => {