Extract a BufferStore object from Project (#14037)

Max Brunsfeld created

This is a ~small~ pure refactor that's a step toward SSH remoting. I've
extracted the Project's buffer state management into a smaller, separate
struct called `BufferStore`, currently in the same crate. I did this as
a separate PR to reduce conflicts between main and `remoting-over-ssh`.

The idea is to make use of this struct (and other smaller structs that
make up `Project`) in a dedicated, simpler `HeadlessProject` type that
we will use in the SSH server to model the remote end of a project. With
this approach, as we develop the headless project, we can avoid adding
more conditional logic to `Project` itself (which is already very
complex), and actually make `Project` a bit smaller by extracting out
helper objects.

Release Notes:

- N/A

Change summary

Cargo.lock                                                    |   1 
crates/client/src/client.rs                                   |  25 
crates/collab/src/tests/integration_tests.rs                  |   8 
crates/collab/src/tests/random_project_collaboration_tests.rs |   6 
crates/editor/src/editor.rs                                   |   2 
crates/editor/src/items.rs                                    |   2 
crates/outline_panel/src/outline_panel.rs                     |   4 
crates/project/src/buffer_store.rs                            | 928 +++++
crates/project/src/lsp_command.rs                             |  14 
crates/project/src/project.rs                                 | 723 --
crates/project/src/project_tests.rs                           |  11 
crates/proto/Cargo.toml                                       |   1 
crates/proto/src/proto.rs                                     |  58 
crates/rpc/src/peer.rs                                        |   8 
crates/worktree/src/worktree.rs                               |  44 
15 files changed, 1,224 insertions(+), 611 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -8199,6 +8199,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "collections",
+ "futures 0.3.28",
  "prost",
  "prost-build",
  "serde",

crates/client/src/client.rs 🔗

@@ -13,8 +13,9 @@ use async_tungstenite::tungstenite::{
 use clock::SystemClock;
 use collections::HashMap;
 use futures::{
-    channel::oneshot, future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt,
-    TryFutureExt as _, TryStreamExt,
+    channel::oneshot,
+    future::{BoxFuture, LocalBoxFuture},
+    AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt, TryFutureExt as _, TryStreamExt,
 };
 use gpui::{
     actions, AnyModel, AnyWeakModel, AppContext, AsyncAppContext, Global, Model, Task, WeakModel,
@@ -23,6 +24,7 @@ use http::{HttpClient, HttpClientWithUrl};
 use lazy_static::lazy_static;
 use parking_lot::RwLock;
 use postage::watch;
+use proto::ProtoClient;
 use rand::prelude::*;
 use release_channel::{AppVersion, ReleaseChannel};
 use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage};
@@ -1408,6 +1410,11 @@ impl Client {
         self.peer.send(self.connection_id()?, message)
     }
 
+    fn send_dynamic(&self, envelope: proto::Envelope) -> Result<()> {
+        let connection_id = self.connection_id()?;
+        self.peer.send_dynamic(connection_id, envelope)
+    }
+
     pub fn request<T: RequestMessage>(
         &self,
         request: T,
@@ -1606,6 +1613,20 @@ impl Client {
     }
 }
 
+impl ProtoClient for Client {
+    fn request(
+        &self,
+        envelope: proto::Envelope,
+        request_type: &'static str,
+    ) -> BoxFuture<'static, Result<proto::Envelope>> {
+        self.request_dynamic(envelope, request_type).boxed()
+    }
+
+    fn send(&self, envelope: proto::Envelope) -> Result<()> {
+        self.send_dynamic(envelope)
+    }
+}
+
 #[derive(Serialize, Deserialize)]
 struct DevelopmentCredentials {
     user_id: u64,

crates/collab/src/tests/integration_tests.rs 🔗

@@ -3327,7 +3327,7 @@ async fn test_local_settings(
         let store = cx.global::<SettingsStore>();
         assert_eq!(
             store
-                .local_settings(worktree_b.read(cx).id().to_usize())
+                .local_settings(worktree_b.entity_id().as_u64() as _)
                 .collect::<Vec<_>>(),
             &[
                 (Path::new("").into(), r#"{"tab_size":2}"#.to_string()),
@@ -3346,7 +3346,7 @@ async fn test_local_settings(
         let store = cx.global::<SettingsStore>();
         assert_eq!(
             store
-                .local_settings(worktree_b.read(cx).id().to_usize())
+                .local_settings(worktree_b.entity_id().as_u64() as _)
                 .collect::<Vec<_>>(),
             &[
                 (Path::new("").into(), r#"{}"#.to_string()),
@@ -3375,7 +3375,7 @@ async fn test_local_settings(
         let store = cx.global::<SettingsStore>();
         assert_eq!(
             store
-                .local_settings(worktree_b.read(cx).id().to_usize())
+                .local_settings(worktree_b.entity_id().as_u64() as _)
                 .collect::<Vec<_>>(),
             &[
                 (Path::new("a").into(), r#"{"tab_size":8}"#.to_string()),
@@ -3407,7 +3407,7 @@ async fn test_local_settings(
         let store = cx.global::<SettingsStore>();
         assert_eq!(
             store
-                .local_settings(worktree_b.read(cx).id().to_usize())
+                .local_settings(worktree_b.entity_id().as_u64() as _)
                 .collect::<Vec<_>>(),
             &[(Path::new("a").into(), r#"{"hard_tabs":true}"#.to_string()),]
         )

crates/collab/src/tests/random_project_collaboration_tests.rs 🔗

@@ -1237,7 +1237,7 @@ impl RandomizedTest for ProjectCollaborationTest {
                             }
                         }
 
-                        for buffer in guest_project.opened_buffers() {
+                        for buffer in guest_project.opened_buffers(cx) {
                             let buffer = buffer.read(cx);
                             assert_eq!(
                                 buffer.deferred_ops_len(),
@@ -1287,8 +1287,8 @@ impl RandomizedTest for ProjectCollaborationTest {
                 for guest_buffer in guest_buffers {
                     let buffer_id =
                         guest_buffer.read_with(client_cx, |buffer, _| buffer.remote_id());
-                    let host_buffer = host_project.read_with(host_cx, |project, _| {
-                        project.buffer_for_id(buffer_id).unwrap_or_else(|| {
+                    let host_buffer = host_project.read_with(host_cx, |project, cx| {
+                        project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| {
                             panic!(
                                 "host does not have buffer for guest:{}, peer:{:?}, id:{}",
                                 client.username,

crates/editor/src/editor.rs 🔗

@@ -8529,7 +8529,7 @@ impl Editor {
     ) -> Vec<(TaskSourceKind, TaskTemplate)> {
         let (inventory, worktree_id, file) = project.read_with(cx, |project, cx| {
             let (worktree_id, file) = project
-                .buffer_for_id(runnable.buffer)
+                .buffer_for_id(runnable.buffer, cx)
                 .and_then(|buffer| buffer.read(cx).file())
                 .map(|file| (WorktreeId::from_usize(file.worktree_id()), file.clone()))
                 .unzip();

crates/editor/src/items.rs 🔗

@@ -371,7 +371,7 @@ async fn update_editor_from_message(
                     continue;
                 };
                 let buffer_id = BufferId::new(excerpt.buffer_id)?;
-                let Some(buffer) = project.read(cx).buffer_for_id(buffer_id) else {
+                let Some(buffer) = project.read(cx).buffer_for_id(buffer_id, cx) else {
                     continue;
                 };
 

crates/outline_panel/src/outline_panel.rs 🔗

@@ -1129,7 +1129,7 @@ impl OutlinePanel {
             EntryOwned::Entry(FsEntry::File(worktree_id, _, buffer_id, _)) => {
                 let project = self.project.read(cx);
                 let entry_id = project
-                    .buffer_for_id(buffer_id)
+                    .buffer_for_id(buffer_id, cx)
                     .and_then(|buffer| buffer.read(cx).entry_id(cx));
                 project
                     .worktree_for_id(worktree_id, cx)
@@ -1147,7 +1147,7 @@ impl OutlinePanel {
                     .remove(&CollapsedEntry::Excerpt(buffer_id, excerpt_id));
                 let project = self.project.read(cx);
                 let entry_id = project
-                    .buffer_for_id(buffer_id)
+                    .buffer_for_id(buffer_id, cx)
                     .and_then(|buffer| buffer.read(cx).entry_id(cx));
 
                 entry_id.and_then(|entry_id| {

crates/project/src/buffer_store.rs 🔗

@@ -0,0 +1,928 @@
+use crate::ProjectPath;
+use anyhow::{anyhow, Context as _, Result};
+use collections::{hash_map, HashMap};
+use futures::{channel::oneshot, StreamExt as _};
+use gpui::{
+    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
+};
+use language::{
+    proto::{deserialize_version, serialize_version, split_operations},
+    Buffer, Capability, Language, Operation,
+};
+use rpc::{
+    proto::{self, AnyProtoClient, PeerId},
+    ErrorExt as _, TypedEnvelope,
+};
+use std::{io, path::Path, sync::Arc};
+use text::BufferId;
+use util::{debug_panic, maybe, ResultExt as _};
+use worktree::{File, ProjectEntryId, RemoteWorktree, Worktree};
+
+/// A set of open buffers.
+pub struct BufferStore {
+    retain_buffers: bool,
+    opened_buffers: HashMap<BufferId, OpenBuffer>,
+    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
+    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
+    #[allow(clippy::type_complexity)]
+    loading_buffers_by_path: HashMap<
+        ProjectPath,
+        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
+    >,
+    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
+    remote_buffer_listeners:
+        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
+}
+
+enum OpenBuffer {
+    Strong(Model<Buffer>),
+    Weak(WeakModel<Buffer>),
+    Operations(Vec<Operation>),
+}
+
+pub enum BufferStoreEvent {
+    BufferAdded(Model<Buffer>),
+    BufferChangedFilePath {
+        buffer: Model<Buffer>,
+        old_file: Option<Arc<File>>,
+    },
+    BufferSaved {
+        buffer: Model<Buffer>,
+        has_changed_file: bool,
+        saved_version: clock::Global,
+    },
+}
+
+impl EventEmitter<BufferStoreEvent> for BufferStore {}
+
+impl BufferStore {
+    /// Creates a buffer store, optionally retaining its buffers.
+    ///
+    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
+    /// and won't be released unless they are explicitly removed, or `retain_buffers`
+    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
+    /// weak handles.
+    pub fn new(retain_buffers: bool) -> Self {
+        Self {
+            retain_buffers,
+            opened_buffers: Default::default(),
+            remote_buffer_listeners: Default::default(),
+            loading_remote_buffers_by_id: Default::default(),
+            local_buffer_ids_by_path: Default::default(),
+            local_buffer_ids_by_entry_id: Default::default(),
+            loading_buffers_by_path: Default::default(),
+        }
+    }
+
+    pub fn open_buffer(
+        &mut self,
+        project_path: ProjectPath,
+        worktree: Model<Worktree>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Model<Buffer>>> {
+        let existing_buffer = self.get_by_path(&project_path, cx);
+        if let Some(existing_buffer) = existing_buffer {
+            return Task::ready(Ok(existing_buffer));
+        }
+
+        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
+            // If the given path is already being loaded, then wait for that existing
+            // task to complete and return the same buffer.
+            hash_map::Entry::Occupied(e) => e.get().clone(),
+
+            // Otherwise, record the fact that this path is now being loaded.
+            hash_map::Entry::Vacant(entry) => {
+                let (mut tx, rx) = postage::watch::channel();
+                entry.insert(rx.clone());
+
+                let project_path = project_path.clone();
+                let load_buffer = match worktree.read(cx) {
+                    Worktree::Local(_) => {
+                        self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
+                    }
+                    Worktree::Remote(tree) => {
+                        self.open_remote_buffer_internal(&project_path.path, tree, cx)
+                    }
+                };
+
+                cx.spawn(move |this, mut cx| async move {
+                    let load_result = load_buffer.await;
+                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
+                        // Record the fact that the buffer is no longer loading.
+                        this.loading_buffers_by_path.remove(&project_path);
+                        let buffer = load_result.map_err(Arc::new)?;
+                        Ok(buffer)
+                    })?);
+                    anyhow::Ok(())
+                })
+                .detach();
+                rx
+            }
+        };
+
+        cx.background_executor().spawn(async move {
+            Self::wait_for_loading_buffer(loading_watch)
+                .await
+                .map_err(|e| e.cloned())
+        })
+    }
+
+    fn open_local_buffer_internal(
+        &mut self,
+        path: Arc<Path>,
+        worktree: Model<Worktree>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Model<Buffer>>> {
+        let load_buffer = worktree.update(cx, |worktree, cx| {
+            let load_file = worktree.load_file(path.as_ref(), cx);
+            let reservation = cx.reserve_model();
+            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
+            cx.spawn(move |_, mut cx| async move {
+                let loaded = load_file.await?;
+                let text_buffer = cx
+                    .background_executor()
+                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
+                    .await;
+                cx.insert_model(reservation, |_| {
+                    Buffer::build(
+                        text_buffer,
+                        loaded.diff_base,
+                        Some(loaded.file),
+                        Capability::ReadWrite,
+                    )
+                })
+            })
+        });
+
+        cx.spawn(move |this, mut cx| async move {
+            let buffer = match load_buffer.await {
+                Ok(buffer) => Ok(buffer),
+                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
+                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
+                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
+                    Buffer::build(
+                        text_buffer,
+                        None,
+                        Some(Arc::new(File {
+                            worktree,
+                            path,
+                            mtime: None,
+                            entry_id: None,
+                            is_local: true,
+                            is_deleted: false,
+                            is_private: false,
+                        })),
+                        Capability::ReadWrite,
+                    )
+                }),
+                Err(e) => Err(e),
+            }?;
+            this.update(&mut cx, |this, cx| {
+                this.add_buffer(buffer.clone(), cx).log_err();
+            })?;
+            Ok(buffer)
+        })
+    }
+
+    fn open_remote_buffer_internal(
+        &self,
+        path: &Arc<Path>,
+        worktree: &RemoteWorktree,
+        cx: &ModelContext<Self>,
+    ) -> Task<Result<Model<Buffer>>> {
+        let worktree_id = worktree.id().to_proto();
+        let project_id = worktree.project_id();
+        let client = worktree.client();
+        let path_string = path.clone().to_string_lossy().to_string();
+        cx.spawn(move |this, mut cx| async move {
+            let response = client
+                .request(proto::OpenBufferByPath {
+                    project_id,
+                    worktree_id,
+                    path: path_string,
+                })
+                .await?;
+            let buffer_id = BufferId::new(response.buffer_id)?;
+            this.update(&mut cx, |this, cx| {
+                this.wait_for_remote_buffer(buffer_id, cx)
+            })?
+            .await
+        })
+    }
+
+    pub fn create_buffer(
+        &mut self,
+        remote_client: Option<(AnyProtoClient, u64)>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<Model<Buffer>>> {
+        if let Some((remote_client, project_id)) = remote_client {
+            let create = remote_client.request(proto::OpenNewBuffer { project_id });
+            cx.spawn(|this, mut cx| async move {
+                let response = create.await?;
+                let buffer_id = BufferId::new(response.buffer_id)?;
+
+                this.update(&mut cx, |this, cx| {
+                    this.wait_for_remote_buffer(buffer_id, cx)
+                })?
+                .await
+            })
+        } else {
+            Task::ready(Ok(self.create_local_buffer("", None, cx)))
+        }
+    }
+
+    pub fn create_local_buffer(
+        &mut self,
+        text: &str,
+        language: Option<Arc<Language>>,
+        cx: &mut ModelContext<Self>,
+    ) -> Model<Buffer> {
+        let buffer = cx.new_model(|cx| {
+            Buffer::local(text, cx)
+                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
+        });
+        self.add_buffer(buffer.clone(), cx).log_err();
+        buffer
+    }
+
+    pub fn save_buffer(
+        &mut self,
+        buffer: Model<Buffer>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
+            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
+        };
+        match file.worktree.read(cx) {
+            Worktree::Local(_) => {
+                self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx)
+            }
+            Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx),
+        }
+    }
+
+    pub fn save_buffer_as(
+        &mut self,
+        buffer: Model<Buffer>,
+        path: ProjectPath,
+        worktree: Model<Worktree>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let old_file = File::from_dyn(buffer.read(cx).file())
+            .cloned()
+            .map(Arc::new);
+
+        let task = match worktree.read(cx) {
+            Worktree::Local(_) => {
+                self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
+            }
+            Worktree::Remote(tree) => {
+                self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx)
+            }
+        };
+        cx.spawn(|this, mut cx| async move {
+            task.await?;
+            this.update(&mut cx, |_, cx| {
+                cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
+            })
+        })
+    }
+
+    fn save_local_buffer(
+        &self,
+        worktree: Model<Worktree>,
+        buffer_handle: Model<Buffer>,
+        path: Arc<Path>,
+        mut has_changed_file: bool,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let buffer = buffer_handle.read(cx);
+        let text = buffer.as_rope().clone();
+        let line_ending = buffer.line_ending();
+        let version = buffer.version();
+        if buffer.file().is_some_and(|file| !file.is_created()) {
+            has_changed_file = true;
+        }
+
+        let save = worktree.update(cx, |worktree, cx| {
+            worktree.write_file(path.as_ref(), text, line_ending, cx)
+        });
+
+        cx.spawn(move |this, mut cx| async move {
+            let new_file = save.await?;
+            let mtime = new_file.mtime;
+            buffer_handle.update(&mut cx, |buffer, cx| {
+                if has_changed_file {
+                    buffer.file_updated(new_file, cx);
+                }
+                buffer.did_save(version.clone(), mtime, cx);
+            })?;
+            this.update(&mut cx, |_, cx| {
+                cx.emit(BufferStoreEvent::BufferSaved {
+                    buffer: buffer_handle,
+                    has_changed_file,
+                    saved_version: version,
+                })
+            })?;
+            Ok(())
+        })
+    }
+
+    fn save_remote_buffer(
+        &self,
+        buffer_handle: Model<Buffer>,
+        new_path: Option<proto::ProjectPath>,
+        tree: &RemoteWorktree,
+        cx: &ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let buffer = buffer_handle.read(cx);
+        let buffer_id = buffer.remote_id().into();
+        let version = buffer.version();
+        let rpc = tree.client();
+        let project_id = tree.project_id();
+        cx.spawn(move |_, mut cx| async move {
+            let response = rpc
+                .request(proto::SaveBuffer {
+                    project_id,
+                    buffer_id,
+                    new_path,
+                    version: serialize_version(&version),
+                })
+                .await?;
+            let version = deserialize_version(&response.version);
+            let mtime = response.mtime.map(|mtime| mtime.into());
+
+            buffer_handle.update(&mut cx, |buffer, cx| {
+                buffer.did_save(version.clone(), mtime, cx);
+            })?;
+
+            Ok(())
+        })
+    }
+
+    fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
+        let remote_id = buffer.read(cx).remote_id();
+        let is_remote = buffer.read(cx).replica_id() != 0;
+        let open_buffer = if self.retain_buffers {
+            OpenBuffer::Strong(buffer.clone())
+        } else {
+            OpenBuffer::Weak(buffer.downgrade())
+        };
+
+        match self.opened_buffers.entry(remote_id) {
+            hash_map::Entry::Vacant(entry) => {
+                entry.insert(open_buffer);
+            }
+            hash_map::Entry::Occupied(mut entry) => {
+                if let OpenBuffer::Operations(operations) = entry.get_mut() {
+                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
+                } else if entry.get().upgrade().is_some() {
+                    if is_remote {
+                        return Ok(());
+                    } else {
+                        debug_panic!("buffer {} was already registered", remote_id);
+                        Err(anyhow!("buffer {} was already registered", remote_id))?;
+                    }
+                }
+                entry.insert(open_buffer);
+            }
+        }
+
+        if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) {
+            for sender in senders {
+                sender.send(Ok(buffer.clone())).ok();
+            }
+        }
+
+        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+            if file.is_local {
+                self.local_buffer_ids_by_path.insert(
+                    ProjectPath {
+                        worktree_id: file.worktree_id(cx),
+                        path: file.path.clone(),
+                    },
+                    remote_id,
+                );
+
+                if let Some(entry_id) = file.entry_id {
+                    self.local_buffer_ids_by_entry_id
+                        .insert(entry_id, remote_id);
+                }
+            }
+        }
+
+        cx.emit(BufferStoreEvent::BufferAdded(buffer));
+        Ok(())
+    }
+
+    pub fn buffers(&self) -> impl '_ + Iterator<Item = Model<Buffer>> {
+        self.opened_buffers
+            .values()
+            .filter_map(|buffer| buffer.upgrade())
+    }
+
+    pub fn loading_buffers(
+        &self,
+    ) -> impl Iterator<
+        Item = (
+            &ProjectPath,
+            postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
+        ),
+    > {
+        self.loading_buffers_by_path
+            .iter()
+            .map(|(path, rx)| (path, rx.clone()))
+    }
+
+    pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option<Model<Buffer>> {
+        self.buffers().find_map(|buffer| {
+            let file = File::from_dyn(buffer.read(cx).file())?;
+            if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path {
+                Some(buffer)
+            } else {
+                None
+            }
+        })
+    }
+
+    pub fn get(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
+        self.opened_buffers
+            .get(&buffer_id)
+            .and_then(|buffer| buffer.upgrade())
+    }
+
+    pub fn get_existing(&self, buffer_id: BufferId) -> Result<Model<Buffer>> {
+        self.get(buffer_id)
+            .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))
+    }
+
+    pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option<Model<Buffer>> {
+        self.get(buffer_id)
+            .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned())
+    }
+
+    fn get_or_remove_by_path(
+        &mut self,
+        entry_id: ProjectEntryId,
+        project_path: &ProjectPath,
+    ) -> Option<(BufferId, Model<Buffer>)> {
+        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
+            Some(&buffer_id) => buffer_id,
+            None => match self.local_buffer_ids_by_path.get(project_path) {
+                Some(&buffer_id) => buffer_id,
+                None => {
+                    return None;
+                }
+            },
+        };
+        let buffer = if let Some(buffer) = self.get(buffer_id) {
+            buffer
+        } else {
+            self.opened_buffers.remove(&buffer_id);
+            self.local_buffer_ids_by_path.remove(project_path);
+            self.local_buffer_ids_by_entry_id.remove(&entry_id);
+            return None;
+        };
+        Some((buffer_id, buffer))
+    }
+
+    pub fn wait_for_remote_buffer(
+        &mut self,
+        id: BufferId,
+        cx: &mut AppContext,
+    ) -> Task<Result<Model<Buffer>>> {
+        let buffer = self.get(id);
+        if let Some(buffer) = buffer {
+            return Task::ready(Ok(buffer));
+        }
+        let (tx, rx) = oneshot::channel();
+        self.remote_buffer_listeners.entry(id).or_default().push(tx);
+        cx.background_executor().spawn(async move { rx.await? })
+    }
+
+    pub fn buffer_version_info(
+        &self,
+        cx: &AppContext,
+    ) -> (Vec<proto::BufferVersion>, Vec<BufferId>) {
+        let buffers = self
+            .buffers()
+            .map(|buffer| {
+                let buffer = buffer.read(cx);
+                proto::BufferVersion {
+                    id: buffer.remote_id().into(),
+                    version: language::proto::serialize_version(&buffer.version),
+                }
+            })
+            .collect();
+        let incomplete_buffer_ids = self
+            .loading_remote_buffers_by_id
+            .keys()
+            .copied()
+            .collect::<Vec<_>>();
+        (buffers, incomplete_buffer_ids)
+    }
+
+    pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
+        self.set_retain_buffers(false, cx);
+
+        for buffer in self.buffers() {
+            buffer.update(cx, |buffer, cx| {
+                buffer.set_capability(Capability::ReadOnly, cx)
+            });
+        }
+
+        // Wake up all futures currently waiting on a buffer to get opened,
+        // to give them a chance to fail now that we've disconnected.
+        self.remote_buffer_listeners.clear();
+    }
+
+    pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) {
+        self.retain_buffers = retain_buffers;
+        for open_buffer in self.opened_buffers.values_mut() {
+            if retain_buffers {
+                if let OpenBuffer::Weak(buffer) = open_buffer {
+                    if let Some(buffer) = buffer.upgrade() {
+                        *open_buffer = OpenBuffer::Strong(buffer);
+                    }
+                }
+            } else {
+                if let Some(buffer) = open_buffer.upgrade() {
+                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
+                }
+                if let OpenBuffer::Strong(buffer) = open_buffer {
+                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
+                }
+            }
+        }
+    }
+
+    pub fn discard_incomplete(&mut self) {
+        self.opened_buffers
+            .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_)));
+    }
+
+    pub fn file_changed(
+        &mut self,
+        path: Arc<Path>,
+        entry_id: ProjectEntryId,
+        worktree_handle: &Model<worktree::Worktree>,
+        snapshot: &worktree::Snapshot,
+        cx: &mut ModelContext<Self>,
+    ) -> Option<(Model<Buffer>, Arc<File>, Arc<File>)> {
+        let (buffer_id, buffer) = self.get_or_remove_by_path(
+            entry_id,
+            &ProjectPath {
+                worktree_id: snapshot.id(),
+                path,
+            },
+        )?;
+
+        let result = buffer.update(cx, |buffer, cx| {
+            let old_file = File::from_dyn(buffer.file())?;
+            if old_file.worktree != *worktree_handle {
+                return None;
+            }
+
+            let new_file = if let Some(entry) = old_file
+                .entry_id
+                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
+            {
+                File {
+                    is_local: true,
+                    entry_id: Some(entry.id),
+                    mtime: entry.mtime,
+                    path: entry.path.clone(),
+                    worktree: worktree_handle.clone(),
+                    is_deleted: false,
+                    is_private: entry.is_private,
+                }
+            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
+                File {
+                    is_local: true,
+                    entry_id: Some(entry.id),
+                    mtime: entry.mtime,
+                    path: entry.path.clone(),
+                    worktree: worktree_handle.clone(),
+                    is_deleted: false,
+                    is_private: entry.is_private,
+                }
+            } else {
+                File {
+                    is_local: true,
+                    entry_id: old_file.entry_id,
+                    path: old_file.path.clone(),
+                    mtime: old_file.mtime,
+                    worktree: worktree_handle.clone(),
+                    is_deleted: true,
+                    is_private: old_file.is_private,
+                }
+            };
+
+            if new_file == *old_file {
+                return None;
+            }
+
+            let old_file = Arc::new(old_file.clone());
+            let new_file = Arc::new(new_file);
+            buffer.file_updated(new_file.clone(), cx);
+            Some((cx.handle(), old_file, new_file))
+        });
+
+        if let Some((buffer, old_file, new_file)) = &result {
+            if new_file.path != old_file.path {
+                self.local_buffer_ids_by_path.remove(&ProjectPath {
+                    path: old_file.path.clone(),
+                    worktree_id: old_file.worktree_id(cx),
+                });
+                self.local_buffer_ids_by_path.insert(
+                    ProjectPath {
+                        worktree_id: new_file.worktree_id(cx),
+                        path: new_file.path.clone(),
+                    },
+                    buffer_id,
+                );
+                cx.emit(BufferStoreEvent::BufferChangedFilePath {
+                    buffer: buffer.clone(),
+                    old_file: Some(old_file.clone()),
+                });
+            }
+
+            if new_file.entry_id != old_file.entry_id {
+                if let Some(entry_id) = old_file.entry_id {
+                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
+                }
+                if let Some(entry_id) = new_file.entry_id {
+                    self.local_buffer_ids_by_entry_id
+                        .insert(entry_id, buffer_id);
+                }
+            }
+        }
+
+        result
+    }
+
+    pub fn buffer_changed_file(
+        &mut self,
+        buffer: Model<Buffer>,
+        cx: &mut AppContext,
+    ) -> Option<()> {
+        let file = File::from_dyn(buffer.read(cx).file())?;
+
+        let remote_id = buffer.read(cx).remote_id();
+        if let Some(entry_id) = file.entry_id {
+            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
+                Some(_) => {
+                    return None;
+                }
+                None => {
+                    self.local_buffer_ids_by_entry_id
+                        .insert(entry_id, remote_id);
+                }
+            }
+        };
+        self.local_buffer_ids_by_path.insert(
+            ProjectPath {
+                worktree_id: file.worktree_id(cx),
+                path: file.path.clone(),
+            },
+            remote_id,
+        );
+
+        Some(())
+    }
+
+    pub async fn create_buffer_for_peer(
+        this: Model<Self>,
+        peer_id: PeerId,
+        buffer_id: BufferId,
+        project_id: u64,
+        client: AnyProtoClient,
+        cx: &mut AsyncAppContext,
+    ) -> Result<()> {
+        let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
+            return Ok(());
+        };
+
+        let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
+        let operations = operations.await;
+        let state = buffer.update(cx, |buffer, _| buffer.to_proto())?;
+
+        let initial_state = proto::CreateBufferForPeer {
+            project_id,
+            peer_id: Some(peer_id),
+            variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
+        };
+
+        if client.send(initial_state).log_err().is_some() {
+            let client = client.clone();
+            cx.background_executor()
+                .spawn(async move {
+                    let mut chunks = split_operations(operations).peekable();
+                    while let Some(chunk) = chunks.next() {
+                        let is_last = chunks.peek().is_none();
+                        client.send(proto::CreateBufferForPeer {
+                            project_id,
+                            peer_id: Some(peer_id),
+                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
+                                proto::BufferChunk {
+                                    buffer_id: buffer_id.into(),
+                                    operations: chunk,
+                                    is_last,
+                                },
+                            )),
+                        })?;
+                    }
+                    anyhow::Ok(())
+                })
+                .await
+                .log_err();
+        }
+        Ok(())
+    }
+
+    pub fn handle_update_buffer(
+        &mut self,
+        envelope: TypedEnvelope<proto::UpdateBuffer>,
+        is_remote: bool,
+        cx: &mut AppContext,
+    ) -> Result<proto::Ack> {
+        let payload = envelope.payload.clone();
+        let buffer_id = BufferId::new(payload.buffer_id)?;
+        let ops = payload
+            .operations
+            .into_iter()
+            .map(language::proto::deserialize_operation)
+            .collect::<Result<Vec<_>, _>>()?;
+        match self.opened_buffers.entry(buffer_id) {
+            hash_map::Entry::Occupied(mut e) => match e.get_mut() {
+                OpenBuffer::Strong(buffer) => {
+                    buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+                }
+                OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
+                OpenBuffer::Weak(_) => {}
+            },
+            hash_map::Entry::Vacant(e) => {
+                if !is_remote {
+                    debug_panic!(
+                        "received buffer update from {:?}",
+                        envelope.original_sender_id
+                    );
+                    return Err(anyhow!("received buffer update for non-remote project"));
+                }
+                e.insert(OpenBuffer::Operations(ops));
+            }
+        }
+        Ok(proto::Ack {})
+    }
+
+    pub fn handle_create_buffer_for_peer(
+        &mut self,
+        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
+        mut worktrees: impl Iterator<Item = Model<Worktree>>,
+        replica_id: u16,
+        capability: Capability,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        match envelope
+            .payload
+            .variant
+            .ok_or_else(|| anyhow!("missing variant"))?
+        {
+            proto::create_buffer_for_peer::Variant::State(mut state) => {
+                let buffer_id = BufferId::new(state.id)?;
+
+                let buffer_result = maybe!({
+                    let mut buffer_file = None;
+                    if let Some(file) = state.file.take() {
+                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
+                        let worktree = worktrees
+                            .find(|worktree| worktree.read(cx).id() == worktree_id)
+                            .ok_or_else(|| {
+                                anyhow!("no worktree found for id {}", file.worktree_id)
+                            })?;
+                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
+                            as Arc<dyn language::File>);
+                    }
+                    Buffer::from_proto(replica_id, capability, state, buffer_file)
+                });
+
+                match buffer_result {
+                    Ok(buffer) => {
+                        let buffer = cx.new_model(|_| buffer);
+                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
+                    }
+                    Err(error) => {
+                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
+                            for listener in listeners {
+                                listener.send(Err(anyhow!(error.cloned()))).ok();
+                            }
+                        }
+                    }
+                }
+            }
+            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
+                let buffer_id = BufferId::new(chunk.buffer_id)?;
+                let buffer = self
+                    .loading_remote_buffers_by_id
+                    .get(&buffer_id)
+                    .cloned()
+                    .ok_or_else(|| {
+                        anyhow!(
+                            "received chunk for buffer {} without initial state",
+                            chunk.buffer_id
+                        )
+                    })?;
+
+                let result = maybe!({
+                    let operations = chunk
+                        .operations
+                        .into_iter()
+                        .map(language::proto::deserialize_operation)
+                        .collect::<Result<Vec<_>>>()?;
+                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))
+                });
+
+                if let Err(error) = result {
+                    self.loading_remote_buffers_by_id.remove(&buffer_id);
+                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
+                        for listener in listeners {
+                            listener.send(Err(error.cloned())).ok();
+                        }
+                    }
+                } else if chunk.is_last {
+                    self.loading_remote_buffers_by_id.remove(&buffer_id);
+                    self.add_buffer(buffer, cx)?;
+                }
+            }
+        }
+
+        Ok(())
+    }
+
+    pub async fn handle_save_buffer(
+        this: Model<Self>,
+        project_id: u64,
+        worktree: Option<Model<Worktree>>,
+        envelope: TypedEnvelope<proto::SaveBuffer>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::BufferSaved> {
+        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
+        let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??;
+        buffer
+            .update(&mut cx, |buffer, _| {
+                buffer.wait_for_version(deserialize_version(&envelope.payload.version))
+            })?
+            .await?;
+        let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?;
+
+        if let Some(new_path) = envelope.payload.new_path {
+            let worktree = worktree.context("no such worktree")?;
+            let new_path = ProjectPath::from_proto(new_path);
+            this.update(&mut cx, |this, cx| {
+                this.save_buffer_as(buffer.clone(), new_path, worktree, cx)
+            })?
+            .await?;
+        } else {
+            this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))?
+                .await?;
+        }
+
+        buffer.update(&mut cx, |buffer, _| proto::BufferSaved {
+            project_id,
+            buffer_id: buffer_id.into(),
+            version: serialize_version(buffer.saved_version()),
+            mtime: buffer.saved_mtime().map(|time| time.into()),
+        })
+    }
+
+    pub async fn wait_for_loading_buffer(
+        mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
+    ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
+        loop {
+            if let Some(result) = receiver.borrow().as_ref() {
+                match result {
+                    Ok(buffer) => return Ok(buffer.to_owned()),
+                    Err(e) => return Err(e.to_owned()),
+                }
+            }
+            receiver.next().await;
+        }
+    }
+}
+
+impl OpenBuffer {
+    fn upgrade(&self) -> Option<Model<Buffer>> {
+        match self {
+            OpenBuffer::Strong(handle) => Some(handle.clone()),
+            OpenBuffer::Weak(handle) => handle.upgrade(),
+            OpenBuffer::Operations(_) => None,
+        }
+    }
+}
+
+fn is_not_found_error(error: &anyhow::Error) -> bool {
+    error
+        .root_cause()
+        .downcast_ref::<io::Error>()
+        .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
+}

crates/project/src/lsp_command.rs 🔗

@@ -410,16 +410,18 @@ impl LspCommand for PerformRename {
         message: proto::PerformRenameResponse,
         project: Model<Project>,
         _: Model<Buffer>,
-        mut cx: AsyncAppContext,
+        cx: AsyncAppContext,
     ) -> Result<ProjectTransaction> {
         let message = message
             .transaction
             .ok_or_else(|| anyhow!("missing transaction"))?;
-        project
-            .update(&mut cx, |project, cx| {
-                project.deserialize_project_transaction(message, self.push_to_history, cx)
-            })?
-            .await
+        Project::deserialize_project_transaction(
+            project.downgrade(),
+            message,
+            self.push_to_history,
+            cx,
+        )
+        .await
     }
 
     fn buffer_id_from_proto(message: &proto::PerformRename) -> Result<BufferId> {

crates/project/src/project.rs 🔗

@@ -1,3 +1,4 @@
+pub mod buffer_store;
 pub mod connection_manager;
 pub mod debounced_delay;
 pub mod lsp_command;
@@ -15,20 +16,17 @@ mod yarn;
 
 use anyhow::{anyhow, bail, Context as _, Result};
 use async_trait::async_trait;
+use buffer_store::{BufferStore, BufferStoreEvent};
 use client::{
     proto, Client, Collaborator, DevServerProjectId, PendingEntitySubscription, ProjectId,
     TypedEnvelope, UserStore,
 };
 use clock::ReplicaId;
-use collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
+use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque};
 use debounced_delay::DebouncedDelay;
 use futures::{
-    channel::{
-        mpsc::{self, UnboundedReceiver},
-        oneshot,
-    },
+    channel::mpsc::{self, UnboundedReceiver},
     future::{join_all, try_join_all, Shared},
-    prelude::future::BoxFuture,
     select,
     stream::FuturesUnordered,
     AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
@@ -54,8 +52,8 @@ use language::{
     range_from_lsp, Bias, Buffer, BufferSnapshot, CachedLspAdapter, Capability, CodeLabel,
     ContextProvider, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Documentation,
     Event as BufferEvent, File as _, Language, LanguageRegistry, LanguageServerName, LocalFile,
-    LspAdapterDelegate, Operation, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot,
-    ToOffset, ToPointUtf16, Transaction, Unclipped,
+    LspAdapterDelegate, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot, ToOffset,
+    ToPointUtf16, Transaction, Unclipped,
 };
 use log::error;
 use lsp::{
@@ -75,7 +73,7 @@ use postage::watch;
 use prettier_support::{DefaultPrettier, PrettierInstance};
 use project_settings::{DirenvSettings, LspSettings, ProjectSettings};
 use rand::prelude::*;
-use rpc::{ErrorCode, ErrorExt as _};
+use rpc::ErrorCode;
 use search::SearchQuery;
 use search_history::SearchHistory;
 use serde::Serialize;
@@ -93,7 +91,7 @@ use std::{
     env,
     ffi::OsStr,
     hash::Hash,
-    io, iter, mem,
+    iter, mem,
     num::NonZeroU32,
     ops::Range,
     path::{self, Component, Path, PathBuf},
@@ -116,7 +114,7 @@ use util::{
     debug_panic, defer, maybe, merge_json_value_into, parse_env_output, post_inc,
     NumericPrefixWithSuffix, ResultExt, TryFutureExt as _,
 };
-use worktree::{CreatedEntry, RemoteWorktreeClient, Snapshot, Traversal};
+use worktree::{CreatedEntry, Snapshot, Traversal};
 use yarn::YarnPathStore;
 
 pub use fs::*;
@@ -200,21 +198,12 @@ pub struct Project {
     client_state: ProjectClientState,
     collaborators: HashMap<proto::PeerId, Collaborator>,
     client_subscriptions: Vec<client::Subscription>,
+    buffer_store: Model<BufferStore>,
     _subscriptions: Vec<gpui::Subscription>,
-    loading_buffers: HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
-    incomplete_remote_buffers: HashMap<BufferId, Model<Buffer>>,
     shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
     #[allow(clippy::type_complexity)]
-    loading_buffers_by_path: HashMap<
-        ProjectPath,
-        postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
-    >,
-    #[allow(clippy::type_complexity)]
     loading_local_worktrees:
         HashMap<Arc<Path>, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
-    opened_buffers: HashMap<BufferId, OpenBuffer>,
-    local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
-    local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
     buffer_snapshots: HashMap<BufferId, HashMap<LanguageServerId, Vec<LspBufferSnapshot>>>, // buffer_id -> server_id -> vec of snapshots
     buffers_being_formatted: HashSet<BufferId>,
     buffers_needing_diff: HashSet<WeakModel<Buffer>>,
@@ -269,12 +258,6 @@ enum LocalProjectUpdate {
     },
 }
 
-enum OpenBuffer {
-    Strong(Model<Buffer>),
-    Weak(WeakModel<Buffer>),
-    Operations(Vec<Operation>),
-}
-
 #[derive(Clone)]
 enum WorktreeHandle {
     Strong(Model<Worktree>),
@@ -731,23 +714,24 @@ impl Project {
             let global_snippets_dir = paths::config_dir().join("snippets");
             let snippets =
                 SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx);
+
+            let buffer_store = cx.new_model(|_| BufferStore::new(false));
+            cx.subscribe(&buffer_store, Self::on_buffer_store_event)
+                .detach();
+
             let yarn = YarnPathStore::new(fs.clone(), cx);
+
             Self {
                 worktrees: Vec::new(),
                 worktrees_reordered: false,
                 buffer_ordered_messages_tx: tx,
                 collaborators: Default::default(),
-                opened_buffers: Default::default(),
+                buffer_store,
                 shared_buffers: Default::default(),
-                loading_buffers_by_path: Default::default(),
                 loading_local_worktrees: Default::default(),
-                local_buffer_ids_by_path: Default::default(),
-                local_buffer_ids_by_entry_id: Default::default(),
                 buffer_snapshots: Default::default(),
                 join_project_response_message_id: 0,
                 client_state: ProjectClientState::Local,
-                loading_buffers: HashMap::default(),
-                incomplete_remote_buffers: HashMap::default(),
                 client_subscriptions: Vec::new(),
                 _subscriptions: vec![
                     cx.observe_global::<SettingsStore>(Self::on_settings_changed),
@@ -864,13 +848,8 @@ impl Project {
             // That's because Worktree's identifier is entity id, which should probably be changed.
             let mut worktrees = Vec::new();
             for worktree in response.payload.worktrees {
-                let worktree = Worktree::remote(
-                    remote_id,
-                    replica_id,
-                    worktree,
-                    Box::new(CollabRemoteWorktreeClient(client.clone())),
-                    cx,
-                );
+                let worktree =
+                    Worktree::remote(remote_id, replica_id, worktree, client.clone().into(), cx);
                 worktrees.push(worktree);
             }
 
@@ -878,17 +857,17 @@ impl Project {
             cx.spawn(move |this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
                 .detach();
 
+            let buffer_store = cx.new_model(|_| BufferStore::new(true));
+            cx.subscribe(&buffer_store, Self::on_buffer_store_event)
+                .detach();
+
             let mut this = Self {
                 worktrees: Vec::new(),
                 worktrees_reordered: false,
                 buffer_ordered_messages_tx: tx,
-                loading_buffers_by_path: Default::default(),
-                loading_buffers: Default::default(),
+                buffer_store,
                 shared_buffers: Default::default(),
-                incomplete_remote_buffers: Default::default(),
                 loading_local_worktrees: Default::default(),
-                local_buffer_ids_by_path: Default::default(),
-                local_buffer_ids_by_entry_id: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
                 join_project_response_message_id: response.message_id,
@@ -939,7 +918,6 @@ impl Project {
                 last_workspace_edits_by_language_server: Default::default(),
                 language_server_watched_paths: HashMap::default(),
                 language_server_watcher_registrations: HashMap::default(),
-                opened_buffers: Default::default(),
                 buffers_being_formatted: Default::default(),
                 buffers_needing_diff: Default::default(),
                 git_diff_debouncer: DebouncedDelay::new(),
@@ -1142,22 +1120,20 @@ impl Project {
     fn on_settings_changed(&mut self, cx: &mut ModelContext<Self>) {
         let mut language_servers_to_start = Vec::new();
         let mut language_formatters_to_check = Vec::new();
-        for buffer in self.opened_buffers.values() {
-            if let Some(buffer) = buffer.upgrade() {
-                let buffer = buffer.read(cx);
-                let buffer_file = File::from_dyn(buffer.file());
-                let buffer_language = buffer.language();
-                let settings = language_settings(buffer_language, buffer.file(), cx);
-                if let Some(language) = buffer_language {
-                    if settings.enable_language_server {
-                        if let Some(file) = buffer_file {
-                            language_servers_to_start
-                                .push((file.worktree.clone(), Arc::clone(language)));
-                        }
+        for buffer in self.buffer_store.read(cx).buffers() {
+            let buffer = buffer.read(cx);
+            let buffer_file = File::from_dyn(buffer.file());
+            let buffer_language = buffer.language();
+            let settings = language_settings(buffer_language, buffer.file(), cx);
+            if let Some(language) = buffer_language {
+                if settings.enable_language_server {
+                    if let Some(file) = buffer_file {
+                        language_servers_to_start
+                            .push((file.worktree.clone(), Arc::clone(language)));
                     }
-                    language_formatters_to_check
-                        .push((buffer_file.map(|f| f.worktree_id(cx)), settings.clone()));
                 }
+                language_formatters_to_check
+                    .push((buffer_file.map(|f| f.worktree_id(cx)), settings.clone()));
             }
         }
 
@@ -1243,10 +1219,8 @@ impl Project {
         cx.notify();
     }
 
-    pub fn buffer_for_id(&self, remote_id: BufferId) -> Option<Model<Buffer>> {
-        self.opened_buffers
-            .get(&remote_id)
-            .and_then(|buffer| buffer.upgrade())
+    pub fn buffer_for_id(&self, remote_id: BufferId, cx: &AppContext) -> Option<Model<Buffer>> {
+        self.buffer_store.read(cx).get(remote_id)
     }
 
     pub fn languages(&self) -> &Arc<LanguageRegistry> {
@@ -1265,30 +1239,16 @@ impl Project {
         self.node.as_ref()
     }
 
-    pub fn opened_buffers(&self) -> Vec<Model<Buffer>> {
-        self.opened_buffers
-            .values()
-            .filter_map(|b| b.upgrade())
-            .collect()
+    pub fn opened_buffers(&self, cx: &AppContext) -> Vec<Model<Buffer>> {
+        self.buffer_store.read(cx).buffers().collect()
     }
 
     #[cfg(any(test, feature = "test-support"))]
     pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
-        let path = path.into();
-        if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
-            self.opened_buffers.iter().any(|(_, buffer)| {
-                if let Some(buffer) = buffer.upgrade() {
-                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
-                        if file.worktree == worktree && file.path() == &path.path {
-                            return true;
-                        }
-                    }
-                }
-                false
-            })
-        } else {
-            false
-        }
+        self.buffer_store
+            .read(cx)
+            .get_by_path(&path.into(), cx)
+            .is_some()
     }
 
     pub fn fs(&self) -> &Arc<dyn Fs> {
@@ -1544,17 +1504,9 @@ impl Project {
                 .set_model(&cx.handle(), &mut cx.to_async()),
         );
 
-        for open_buffer in self.opened_buffers.values_mut() {
-            match open_buffer {
-                OpenBuffer::Strong(_) => {}
-                OpenBuffer::Weak(buffer) => {
-                    if let Some(buffer) = buffer.upgrade() {
-                        *open_buffer = OpenBuffer::Strong(buffer);
-                    }
-                }
-                OpenBuffer::Operations(_) => unreachable!(),
-            }
-        }
+        self.buffer_store.update(cx, |buffer_store, cx| {
+            buffer_store.set_retain_buffers(true, cx)
+        });
 
         for worktree_handle in self.worktrees.iter_mut() {
             match worktree_handle {
@@ -1654,58 +1606,30 @@ impl Project {
                             })??;
                         }
                         LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => {
-                            let buffer = this.update(&mut cx, |this, _| {
-                                let buffer = this.opened_buffers.get(&buffer_id).unwrap();
-                                let shared_buffers =
-                                    this.shared_buffers.entry(peer_id).or_default();
-                                if shared_buffers.insert(buffer_id) {
-                                    if let OpenBuffer::Strong(buffer) = buffer {
-                                        Some(buffer.clone())
-                                    } else {
-                                        None
-                                    }
+                            let Some(buffer_store) = this.update(&mut cx, |this, _| {
+                                if this
+                                    .shared_buffers
+                                    .entry(peer_id)
+                                    .or_default()
+                                    .insert(buffer_id)
+                                {
+                                    Some(this.buffer_store.clone())
                                 } else {
                                     None
                                 }
-                            })?;
-
-                            let Some(buffer) = buffer else { continue };
-                            let operations =
-                                buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
-                            let operations = operations.await;
-                            let state = buffer.update(&mut cx, |buffer, _| buffer.to_proto())?;
-
-                            let initial_state = proto::CreateBufferForPeer {
-                                project_id,
-                                peer_id: Some(peer_id),
-                                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
+                            })?
+                            else {
+                                continue;
                             };
-                            if client.send(initial_state).log_err().is_some() {
-                                let client = client.clone();
-                                cx.background_executor()
-                                    .spawn(async move {
-                                        let mut chunks = split_operations(operations).peekable();
-                                        while let Some(chunk) = chunks.next() {
-                                            let is_last = chunks.peek().is_none();
-                                            client.send(proto::CreateBufferForPeer {
-                                                project_id,
-                                                peer_id: Some(peer_id),
-                                                variant: Some(
-                                                    proto::create_buffer_for_peer::Variant::Chunk(
-                                                        proto::BufferChunk {
-                                                            buffer_id: buffer_id.into(),
-                                                            operations: chunk,
-                                                            is_last,
-                                                        },
-                                                    ),
-                                                ),
-                                            })?;
-                                        }
-                                        anyhow::Ok(())
-                                    })
-                                    .await
-                                    .log_err();
-                            }
+                            BufferStore::create_buffer_for_peer(
+                                buffer_store,
+                                peer_id,
+                                buffer_id,
+                                project_id,
+                                client.clone().into(),
+                                &mut cx,
+                            )
+                            .await?;
                         }
                     }
                 }
@@ -1807,16 +1731,9 @@ impl Project {
                 }
             }
 
-            for open_buffer in self.opened_buffers.values_mut() {
-                // Wake up any tasks waiting for peers' edits to this buffer.
-                if let Some(buffer) = open_buffer.upgrade() {
-                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
-                }
-
-                if let OpenBuffer::Strong(buffer) = open_buffer {
-                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
-                }
-            }
+            self.buffer_store.update(cx, |buffer_store, cx| {
+                buffer_store.set_retain_buffers(false, cx)
+            });
 
             self.client
                 .send(proto::UnshareProject {
@@ -1852,7 +1769,7 @@ impl Project {
             }
 
             *capability = new_capability;
-            for buffer in self.opened_buffers() {
+            for buffer in self.opened_buffers(cx) {
                 buffer.update(cx, |buffer, cx| buffer.set_capability(new_capability, cx));
             }
         }
@@ -1878,24 +1795,9 @@ impl Project {
                 }
             }
 
-            for open_buffer in self.opened_buffers.values_mut() {
-                // Wake up any tasks waiting for peers' edits to this buffer.
-                if let Some(buffer) = open_buffer.upgrade() {
-                    buffer.update(cx, |buffer, cx| {
-                        buffer.give_up_waiting();
-                        buffer.set_capability(Capability::ReadOnly, cx)
-                    });
-                }
-
-                if let OpenBuffer::Strong(buffer) = open_buffer {
-                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
-                }
-            }
-
-            // Wake up all futures currently waiting on a buffer to get opened,
-            // to give them a chance to fail now that we've disconnected.
-            self.loading_buffers.clear();
-            // self.opened_buffer.send(OpenedBufferEvent::Disconnected);
+            self.buffer_store.update(cx, |buffer_store, cx| {
+                buffer_store.disconnected_from_host(cx)
+            });
         }
     }
 
@@ -1936,22 +1838,16 @@ impl Project {
     }
 
     pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
-        if self.is_remote() {
-            let create = self.client.request(proto::OpenNewBuffer {
-                project_id: self.remote_id().unwrap(),
-            });
-            cx.spawn(|this, mut cx| async move {
-                let response = create.await?;
-                let buffer_id = BufferId::new(response.buffer_id)?;
-
-                this.update(&mut cx, |this, cx| {
-                    this.wait_for_remote_buffer(buffer_id, cx)
-                })?
-                .await
-            })
-        } else {
-            Task::ready(Ok(self.create_local_buffer("", None, cx)))
-        }
+        self.buffer_store.update(cx, |buffer_store, cx| {
+            buffer_store.create_buffer(
+                if self.is_remote() {
+                    Some((self.client.clone().into(), self.remote_id().unwrap()))
+                } else {
+                    None
+                },
+                cx,
+            )
+        })
     }
 
     pub fn create_local_buffer(
@@ -1963,13 +1859,9 @@ impl Project {
         if self.is_remote() {
             panic!("called create_local_buffer on a remote project")
         }
-        let buffer = cx.new_model(|cx| {
-            Buffer::local(text, cx)
-                .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx)
-        });
-        self.register_buffer(&buffer, cx)
-            .expect("creating local buffers always succeeds");
-        buffer
+        self.buffer_store.update(cx, |buffer_store, cx| {
+            buffer_store.create_local_buffer(text, language, cx)
+        })
     }
 
     pub fn open_path(
@@ -2037,133 +1929,12 @@ impl Project {
             return Task::ready(Err(anyhow!("no such worktree")));
         };
 
-        // If there is already a buffer for the given path, then return it.
-        let existing_buffer = self.get_open_buffer(&project_path, cx);
-        if let Some(existing_buffer) = existing_buffer {
-            return Task::ready(Ok(existing_buffer));
-        }
-
-        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
-            // If the given path is already being loaded, then wait for that existing
-            // task to complete and return the same buffer.
-            hash_map::Entry::Occupied(e) => e.get().clone(),
-
-            // Otherwise, record the fact that this path is now being loaded.
-            hash_map::Entry::Vacant(entry) => {
-                let (mut tx, rx) = postage::watch::channel();
-                entry.insert(rx.clone());
-
-                let project_path = project_path.clone();
-                let load_buffer = if worktree.read(cx).is_local() {
-                    self.open_local_buffer_internal(project_path.path.clone(), worktree, cx)
-                } else {
-                    self.open_remote_buffer_internal(&project_path.path, &worktree, cx)
-                };
-
-                cx.spawn(move |this, mut cx| async move {
-                    let load_result = load_buffer.await;
-                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
-                        // Record the fact that the buffer is no longer loading.
-                        this.loading_buffers_by_path.remove(&project_path);
-                        let buffer = load_result.map_err(Arc::new)?;
-                        Ok(buffer)
-                    })?);
-                    anyhow::Ok(())
-                })
-                .detach();
-                rx
-            }
-        };
-
-        cx.background_executor().spawn(async move {
-            wait_for_loading_buffer(loading_watch)
-                .await
-                .map_err(|e| e.cloned())
-        })
-    }
-
-    fn open_local_buffer_internal(
-        &mut self,
-        path: Arc<Path>,
-        worktree: Model<Worktree>,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<Model<Buffer>>> {
-        let load_buffer = worktree.update(cx, |worktree, cx| {
-            let load_file = worktree.load_file(path.as_ref(), cx);
-            let reservation = cx.reserve_model();
-            let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
-            cx.spawn(move |_, mut cx| async move {
-                let loaded = load_file.await?;
-                let text_buffer = cx
-                    .background_executor()
-                    .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
-                    .await;
-                cx.insert_model(reservation, |_| {
-                    Buffer::build(
-                        text_buffer,
-                        loaded.diff_base,
-                        Some(loaded.file),
-                        Capability::ReadWrite,
-                    )
-                })
-            })
-        });
-
-        cx.spawn(move |this, mut cx| async move {
-            let buffer = match load_buffer.await {
-                Ok(buffer) => Ok(buffer),
-                Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
-                    let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
-                    let text_buffer = text::Buffer::new(0, buffer_id, "".into());
-                    Buffer::build(
-                        text_buffer,
-                        None,
-                        Some(Arc::new(File {
-                            worktree,
-                            path,
-                            mtime: None,
-                            entry_id: None,
-                            is_local: true,
-                            is_deleted: false,
-                            is_private: false,
-                        })),
-                        Capability::ReadWrite,
-                    )
-                }),
-                Err(e) => Err(e),
-            }?;
-            this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))??;
-            Ok(buffer)
-        })
-    }
-
-    fn open_remote_buffer_internal(
-        &mut self,
-        path: &Arc<Path>,
-        worktree: &Model<Worktree>,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<Model<Buffer>>> {
-        let rpc = self.client.clone();
-        let project_id = self.remote_id().unwrap();
-        let remote_worktree_id = worktree.read(cx).id();
-        let path = path.clone();
-        let path_string = path.to_string_lossy().to_string();
-        if self.is_disconnected() {
+        if self.is_remote() && self.is_disconnected() {
             return Task::ready(Err(anyhow!(ErrorCode::Disconnected)));
         }
-        cx.spawn(move |this, mut cx| async move {
-            let response = rpc
-                .request(proto::OpenBufferByPath {
-                    project_id,
-                    worktree_id: remote_worktree_id.to_proto(),
-                    path: path_string,
-                })
-                .await?;
-            let buffer_id = BufferId::new(response.buffer_id)?;
-            this.update(&mut cx, |this, cx| {
-                this.wait_for_remote_buffer(buffer_id, cx)
-            })?
-            .await
+
+        self.buffer_store.update(cx, |buffer_store, cx| {
+            buffer_store.open_buffer(project_path, worktree, cx)
         })
     }
 
@@ -2246,7 +2017,7 @@ impl Project {
         id: BufferId,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<Model<Buffer>>> {
-        if let Some(buffer) = self.buffer_for_id(id) {
+        if let Some(buffer) = self.buffer_for_id(id, cx) {
             Task::ready(Ok(buffer))
         } else if self.is_local() {
             Task::ready(Err(anyhow!("buffer {} does not exist", id)))
@@ -2287,16 +2058,8 @@ impl Project {
         buffer: Model<Buffer>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<()>> {
-        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
-            return Task::ready(Err(anyhow!("buffer doesn't have a file")));
-        };
-        let worktree = file.worktree.clone();
-        let path = file.path.clone();
-        if self.is_local() {
-            self.save_local_buffer(worktree, buffer, path, false, cx)
-        } else {
-            self.save_remote_buffer(buffer, None, cx)
-        }
+        self.buffer_store
+            .update(cx, |buffer_store, cx| buffer_store.save_buffer(buffer, cx))
     }
 
     pub fn save_buffer_as(
@@ -2305,121 +2068,11 @@ impl Project {
         path: ProjectPath,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<()>> {
-        let old_file = File::from_dyn(buffer.read(cx).file()).cloned();
         let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) else {
             return Task::ready(Err(anyhow!("worktree does not exist")));
         };
-
-        cx.spawn(move |this, mut cx| async move {
-            this.update(&mut cx, |this, cx| {
-                if this.is_local() {
-                    if let Some(old_file) = &old_file {
-                        this.unregister_buffer_from_language_servers(&buffer, old_file, cx);
-                    }
-                    this.save_local_buffer(worktree, buffer.clone(), path.path, true, cx)
-                } else {
-                    this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx)
-                }
-            })?
-            .await?;
-
-            this.update(&mut cx, |this, cx| {
-                this.detect_language_for_buffer(&buffer, cx);
-                this.register_buffer_with_language_servers(&buffer, cx);
-            })?;
-            Ok(())
-        })
-    }
-
-    pub fn save_local_buffer(
-        &self,
-        worktree: Model<Worktree>,
-        buffer_handle: Model<Buffer>,
-        path: Arc<Path>,
-        mut has_changed_file: bool,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<()>> {
-        let buffer = buffer_handle.read(cx);
-        let buffer_id = buffer.remote_id();
-        let text = buffer.as_rope().clone();
-        let line_ending = buffer.line_ending();
-        let version = buffer.version();
-        if buffer.file().is_some_and(|file| !file.is_created()) {
-            has_changed_file = true;
-        }
-
-        let save = worktree.update(cx, |worktree, cx| {
-            worktree.write_file(path.as_ref(), text, line_ending, cx)
-        });
-
-        let client = self.client.clone();
-        let project_id = self.remote_id();
-        cx.spawn(move |_, mut cx| async move {
-            let new_file = save.await?;
-            let mtime = new_file.mtime;
-            if has_changed_file {
-                if let Some(project_id) = project_id {
-                    client
-                        .send(proto::UpdateBufferFile {
-                            project_id,
-                            buffer_id: buffer_id.into(),
-                            file: Some(new_file.to_proto()),
-                        })
-                        .log_err();
-                }
-
-                buffer_handle.update(&mut cx, |buffer, cx| {
-                    if has_changed_file {
-                        buffer.file_updated(new_file, cx);
-                    }
-                })?;
-            }
-
-            if let Some(project_id) = project_id {
-                client.send(proto::BufferSaved {
-                    project_id,
-                    buffer_id: buffer_id.into(),
-                    version: serialize_version(&version),
-                    mtime: mtime.map(|time| time.into()),
-                })?;
-            }
-
-            buffer_handle.update(&mut cx, |buffer, cx| {
-                buffer.did_save(version.clone(), mtime, cx);
-            })?;
-
-            Ok(())
-        })
-    }
-
-    pub fn save_remote_buffer(
-        &self,
-        buffer_handle: Model<Buffer>,
-        new_path: Option<proto::ProjectPath>,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<()>> {
-        let buffer = buffer_handle.read(cx);
-        let buffer_id = buffer.remote_id().into();
-        let version = buffer.version();
-        let rpc = self.client.clone();
-        let project_id = self.remote_id();
-        cx.spawn(move |_, mut cx| async move {
-            let response = rpc
-                .request(proto::SaveBuffer {
-                    project_id: project_id.ok_or_else(|| anyhow!("project_id is not set"))?,
-                    buffer_id,
-                    new_path,
-                    version: serialize_version(&version),
-                })
-                .await?;
-            let version = deserialize_version(&response.version);
-            let mtime = response.mtime.map(|mtime| mtime.into());
-
-            buffer_handle.update(&mut cx, |buffer, cx| {
-                buffer.did_save(version.clone(), mtime, cx);
-            })?;
-
-            Ok(())
+        self.buffer_store.update(cx, |buffer_store, cx| {
+            buffer_store.save_buffer_as(buffer.clone(), path, worktree, cx)
         })
     }
 
@@ -2428,16 +2081,7 @@ impl Project {
         path: &ProjectPath,
         cx: &mut ModelContext<Self>,
     ) -> Option<Model<Buffer>> {
-        let worktree = self.worktree_for_id(path.worktree_id, cx)?;
-        self.opened_buffers.values().find_map(|buffer| {
-            let buffer = buffer.upgrade()?;
-            let file = File::from_dyn(buffer.read(cx).file())?;
-            if file.worktree == worktree && file.path() == &path.path {
-                Some(buffer)
-            } else {
-                None
-            }
-        })
+        self.buffer_store.read(cx).get_by_path(path, cx)
     }
 
     fn register_buffer(
@@ -2450,54 +2094,11 @@ impl Project {
             buffer.set_language_registry(self.languages.clone())
         });
 
-        let remote_id = buffer.read(cx).remote_id();
-        let is_remote = self.is_remote();
-        let open_buffer = if is_remote || self.is_shared() {
-            OpenBuffer::Strong(buffer.clone())
-        } else {
-            OpenBuffer::Weak(buffer.downgrade())
-        };
-
-        match self.opened_buffers.entry(remote_id) {
-            hash_map::Entry::Vacant(entry) => {
-                entry.insert(open_buffer);
-            }
-            hash_map::Entry::Occupied(mut entry) => {
-                if let OpenBuffer::Operations(operations) = entry.get_mut() {
-                    buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?;
-                } else if entry.get().upgrade().is_some() {
-                    if is_remote {
-                        return Ok(());
-                    } else {
-                        debug_panic!("buffer {} was already registered", remote_id);
-                        Err(anyhow!("buffer {} was already registered", remote_id))?;
-                    }
-                }
-                entry.insert(open_buffer);
-            }
-        }
         cx.subscribe(buffer, |this, buffer, event, cx| {
             this.on_buffer_event(buffer, event, cx);
         })
         .detach();
 
-        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
-            if file.is_local {
-                self.local_buffer_ids_by_path.insert(
-                    ProjectPath {
-                        worktree_id: file.worktree_id(cx),
-                        path: file.path.clone(),
-                    },
-                    remote_id,
-                );
-
-                if let Some(entry_id) = file.entry_id {
-                    self.local_buffer_ids_by_entry_id
-                        .insert(entry_id, remote_id);
-                }
-            }
-        }
-
         self.detect_language_for_buffer(buffer, cx);
         self.register_buffer_with_language_servers(buffer, cx);
         cx.observe_release(buffer, |this, buffer, cx| {
@@ -2519,11 +2120,6 @@ impl Project {
         })
         .detach();
 
-        if let Some(senders) = self.loading_buffers.remove(&remote_id) {
-            for sender in senders {
-                sender.send(Ok(buffer.clone())).ok();
-            }
-        }
         Ok(())
     }
 
@@ -2617,7 +2213,7 @@ impl Project {
         &mut self,
         buffer: &Model<Buffer>,
         old_file: &File,
-        cx: &mut ModelContext<Self>,
+        cx: &mut AppContext,
     ) {
         let old_path = match old_file.as_local() {
             Some(local) => local.abs_path(cx),
@@ -2758,6 +2354,57 @@ impl Project {
         Ok(())
     }
 
+    fn on_buffer_store_event(
+        &mut self,
+        _: Model<BufferStore>,
+        event: &BufferStoreEvent,
+        cx: &mut ModelContext<Self>,
+    ) {
+        match event {
+            BufferStoreEvent::BufferAdded(buffer) => {
+                self.register_buffer(buffer, cx).log_err();
+            }
+            BufferStoreEvent::BufferChangedFilePath { buffer, old_file } => {
+                if let Some(old_file) = &old_file {
+                    self.unregister_buffer_from_language_servers(&buffer, old_file, cx);
+                }
+
+                self.detect_language_for_buffer(&buffer, cx);
+                self.register_buffer_with_language_servers(&buffer, cx);
+            }
+            BufferStoreEvent::BufferSaved {
+                buffer: buffer_handle,
+                has_changed_file,
+                saved_version,
+            } => {
+                let buffer = buffer_handle.read(cx);
+                let buffer_id = buffer.remote_id();
+                let Some(new_file) = buffer.file() else {
+                    return;
+                };
+                if let Some(project_id) = self.remote_id() {
+                    self.client
+                        .send(proto::BufferSaved {
+                            project_id,
+                            buffer_id: buffer_id.into(),
+                            version: serialize_version(&saved_version),
+                            mtime: new_file.mtime().map(|time| time.into()),
+                        })
+                        .log_err();
+                    if *has_changed_file {
+                        self.client
+                            .send(proto::UpdateBufferFile {
+                                project_id,
+                                buffer_id: buffer_id.into(),
+                                file: Some(new_file.to_proto()),
+                            })
+                            .log_err();
+                    }
+                }
+            }
+        }
+    }
+
     fn on_buffer_event(
         &mut self,
         buffer: Model<Buffer>,
@@ -2916,30 +2563,11 @@ impl Project {
                     self.simulate_disk_based_diagnostics_events_if_needed(language_server_id, cx);
                 }
             }
-            BufferEvent::FileHandleChanged => {
-                let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
-                    return None;
-                };
 
-                let remote_id = buffer.read(cx).remote_id();
-                if let Some(entry_id) = file.entry_id {
-                    match self.local_buffer_ids_by_entry_id.get(&entry_id) {
-                        Some(_) => {
-                            return None;
-                        }
-                        None => {
-                            self.local_buffer_ids_by_entry_id
-                                .insert(entry_id, remote_id);
-                        }
-                    }
-                };
-                self.local_buffer_ids_by_path.insert(
-                    ProjectPath {
-                        worktree_id: file.worktree_id(cx),
-                        path: file.path.clone(),
-                    },
-                    remote_id,
-                );
+            BufferEvent::FileHandleChanged => {
+                self.buffer_store.update(cx, |buffer_store, cx| {
+                    buffer_store.buffer_changed_file(buffer, cx)
+                })?;
             }
             _ => {}
         }
@@ -3103,21 +2731,20 @@ impl Project {
                         prev_reload_count = reload_count;
                         project
                             .update(&mut cx, |this, cx| {
-                                let buffers = this
-                                    .opened_buffers
-                                    .values()
-                                    .filter_map(|b| b.upgrade())
-                                    .collect::<Vec<_>>();
-                                for buffer in buffers {
-                                    if let Some(f) = File::from_dyn(buffer.read(cx).file()).cloned()
-                                    {
-                                        this.unregister_buffer_from_language_servers(
-                                            &buffer, &f, cx,
-                                        );
-                                        buffer
-                                            .update(cx, |buffer, cx| buffer.set_language(None, cx));
+                                this.buffer_store.clone().update(cx, |buffer_store, cx| {
+                                    for buffer in buffer_store.buffers() {
+                                        if let Some(f) =
+                                            File::from_dyn(buffer.read(cx).file()).cloned()
+                                        {
+                                            this.unregister_buffer_from_language_servers(
+                                                &buffer, &f, cx,
+                                            );
+                                            buffer.update(cx, |buffer, cx| {
+                                                buffer.set_language(None, cx)
+                                            });
+                                        }
                                     }
-                                }
+                                });
                             })
                             .ok();
                     }
@@ -3126,16 +2753,14 @@ impl Project {
                         .update(&mut cx, |project, cx| {
                             let mut plain_text_buffers = Vec::new();
                             let mut buffers_with_unknown_injections = Vec::new();
-                            for buffer in project.opened_buffers.values() {
-                                if let Some(handle) = buffer.upgrade() {
-                                    let buffer = &handle.read(cx);
-                                    if buffer.language().is_none()
-                                        || buffer.language() == Some(&*language::PLAIN_TEXT)
-                                    {
-                                        plain_text_buffers.push(handle);
-                                    } else if buffer.contains_unknown_injections() {
-                                        buffers_with_unknown_injections.push(handle);
-                                    }
+                            for handle in project.buffer_store.read(cx).buffers() {
+                                let buffer = handle.read(cx);
+                                if buffer.language().is_none()
+                                    || buffer.language() == Some(&*language::PLAIN_TEXT)
+                                {
+                                    plain_text_buffers.push(handle);
+                                } else if buffer.contains_unknown_injections() {
+                                    buffers_with_unknown_injections.push(handle);
                                 }
                             }
 

crates/project/src/project_tests.rs 🔗

@@ -3056,15 +3056,8 @@ async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) {
         });
     });
 
-    let remote = cx.update(|cx| {
-        Worktree::remote(
-            0,
-            1,
-            metadata,
-            Box::new(CollabRemoteWorktreeClient(project.read(cx).client())),
-            cx,
-        )
-    });
+    let remote =
+        cx.update(|cx| Worktree::remote(0, 1, metadata, project.read(cx).client().into(), cx));
 
     cx.executor().run_until_parked();
 

crates/proto/Cargo.toml 🔗

@@ -19,6 +19,7 @@ doctest = false
 [dependencies]
 anyhow.workspace = true
 collections.workspace = true
+futures.workspace = true
 prost.workspace = true
 serde.workspace = true
 

crates/proto/src/proto.rs 🔗

@@ -7,18 +7,19 @@ mod typed_envelope;
 pub use error::*;
 pub use typed_envelope::*;
 
+use anyhow::anyhow;
 use collections::HashMap;
+use futures::{future::BoxFuture, Future};
 pub use prost::{DecodeError, Message};
 use serde::Serialize;
-use std::any::{Any, TypeId};
-use std::time::Instant;
 use std::{
+    any::{Any, TypeId},
     cmp,
-    fmt::Debug,
-    iter,
-    time::{Duration, SystemTime, UNIX_EPOCH},
+    fmt::{self, Debug},
+    iter, mem,
+    sync::Arc,
+    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
 };
-use std::{fmt, mem};
 
 include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 
@@ -59,6 +60,51 @@ pub enum MessagePriority {
     Background,
 }
 
+pub trait ProtoClient: Send + Sync {
+    fn request(
+        &self,
+        envelope: Envelope,
+        request_type: &'static str,
+    ) -> BoxFuture<'static, anyhow::Result<Envelope>>;
+
+    fn send(&self, envelope: Envelope) -> anyhow::Result<()>;
+}
+
+#[derive(Clone)]
+pub struct AnyProtoClient(Arc<dyn ProtoClient>);
+
+impl<T> From<Arc<T>> for AnyProtoClient
+where
+    T: ProtoClient + 'static,
+{
+    fn from(client: Arc<T>) -> Self {
+        Self(client)
+    }
+}
+
+impl AnyProtoClient {
+    pub fn new<T: ProtoClient + 'static>(client: Arc<T>) -> Self {
+        Self(client)
+    }
+
+    pub fn request<T: RequestMessage>(
+        &self,
+        request: T,
+    ) -> impl Future<Output = anyhow::Result<T::Response>> {
+        let envelope = request.into_envelope(0, None, None);
+        let response = self.0.request(envelope, T::NAME);
+        async move {
+            T::Response::from_envelope(response.await?)
+                .ok_or_else(|| anyhow!("received response of the wrong type"))
+        }
+    }
+
+    pub fn send<T: EnvelopedMessage>(&self, request: T) -> anyhow::Result<()> {
+        let envelope = request.into_envelope(0, None, None);
+        self.0.send(envelope)
+    }
+}
+
 impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
     fn payload_type_id(&self) -> TypeId {
         TypeId::of::<T>()

crates/rpc/src/peer.rs 🔗

@@ -557,6 +557,14 @@ impl Peer {
         Ok(())
     }
 
+    pub fn send_dynamic(&self, receiver_id: ConnectionId, message: proto::Envelope) -> Result<()> {
+        let connection = self.connection_state(receiver_id)?;
+        connection
+            .outgoing_tx
+            .unbounded_send(proto::Message::Envelope(message))?;
+        Ok(())
+    }
+
     pub fn forward_send<T: EnvelopedMessage>(
         &self,
         sender_id: ConnectionId,

crates/worktree/src/worktree.rs 🔗

@@ -37,7 +37,7 @@ use postage::{
     prelude::{Sink as _, Stream as _},
     watch,
 };
-use rpc::proto::{self, EnvelopedMessage as _, RequestMessage};
+use rpc::proto::{self, AnyProtoClient};
 use settings::{Settings, SettingsLocation, SettingsStore};
 use smol::channel::{self, Sender};
 use std::{
@@ -131,7 +131,7 @@ pub struct RemoteWorktree {
     snapshot: Snapshot,
     background_snapshot: Arc<Mutex<Snapshot>>,
     project_id: u64,
-    client: Box<dyn RemoteWorktreeClient>,
+    client: AnyProtoClient,
     updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
     update_observer: Arc<
         Mutex<Option<Box<dyn Send + FnMut(proto::UpdateWorktree) -> BoxFuture<'static, bool>>>>,
@@ -142,14 +142,6 @@ pub struct RemoteWorktree {
     disconnected: bool,
 }
 
-pub trait RemoteWorktreeClient {
-    fn request(
-        &self,
-        envelope: proto::Envelope,
-        request_type: &'static str,
-    ) -> BoxFuture<'static, Result<proto::Envelope>>;
-}
-
 #[derive(Clone)]
 pub struct Snapshot {
     id: WorktreeId,
@@ -461,7 +453,7 @@ impl Worktree {
         project_id: u64,
         replica_id: ReplicaId,
         worktree: proto::WorktreeMetadata,
-        client: Box<dyn RemoteWorktreeClient>,
+        client: AnyProtoClient,
         cx: &mut AppContext,
     ) -> Model<Self> {
         cx.new_model(|cx: &mut ModelContext<Self>| {
@@ -706,7 +698,7 @@ impl Worktree {
             Worktree::Local(this) => this.create_entry(path, is_directory, cx),
             Worktree::Remote(this) => {
                 let project_id = this.project_id;
-                let request = this.rpc_request(proto::CreateProjectEntry {
+                let request = this.client.request(proto::CreateProjectEntry {
                     worktree_id: worktree_id.to_proto(),
                     project_id,
                     path: path.to_string_lossy().into(),
@@ -748,7 +740,7 @@ impl Worktree {
         match self {
             Worktree::Local(this) => this.delete_entry(entry_id, trash, cx),
             Worktree::Remote(this) => {
-                let response = this.rpc_request(proto::DeleteProjectEntry {
+                let response = this.client.request(proto::DeleteProjectEntry {
                     project_id: this.project_id,
                     entry_id: entry_id.to_proto(),
                     use_trash: trash,
@@ -778,7 +770,7 @@ impl Worktree {
         match self {
             Worktree::Local(this) => this.rename_entry(entry_id, new_path, cx),
             Worktree::Remote(this) => {
-                let response = this.rpc_request(proto::RenameProjectEntry {
+                let response = this.client.request(proto::RenameProjectEntry {
                     project_id: this.project_id,
                     entry_id: entry_id.to_proto(),
                     new_path: new_path.to_string_lossy().into(),
@@ -820,7 +812,7 @@ impl Worktree {
         match self {
             Worktree::Local(this) => this.copy_entry(entry_id, new_path, cx),
             Worktree::Remote(this) => {
-                let response = this.rpc_request(proto::CopyProjectEntry {
+                let response = this.client.request(proto::CopyProjectEntry {
                     project_id: this.project_id,
                     entry_id: entry_id.to_proto(),
                     new_path: new_path.to_string_lossy().into(),
@@ -870,7 +862,7 @@ impl Worktree {
         match self {
             Worktree::Local(this) => this.expand_entry(entry_id, cx),
             Worktree::Remote(this) => {
-                let response = this.rpc_request(proto::ExpandProjectEntry {
+                let response = this.client.request(proto::ExpandProjectEntry {
                     project_id: this.project_id,
                     entry_id: entry_id.to_proto(),
                 });
@@ -1811,24 +1803,20 @@ impl LocalWorktree {
 }
 
 impl RemoteWorktree {
+    pub fn project_id(&self) -> u64 {
+        self.project_id
+    }
+
+    pub fn client(&self) -> AnyProtoClient {
+        self.client.clone()
+    }
+
     pub fn disconnected_from_host(&mut self) {
         self.updates_tx.take();
         self.snapshot_subscriptions.clear();
         self.disconnected = true;
     }
 
-    fn rpc_request<T: RequestMessage>(
-        &self,
-        request: T,
-    ) -> impl Future<Output = Result<T::Response>> {
-        let envelope = request.into_envelope(0, None, None);
-        let response = self.client.request(envelope, T::NAME);
-        async move {
-            T::Response::from_envelope(response.await?)
-                .ok_or_else(|| anyhow!("received response of the wrong type"))
-        }
-    }
-
     pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) {
         if let Some(updates_tx) = &self.updates_tx {
             updates_tx