Rebuild buffer store to be aware of remote/local distinction (#18303)

Conrad Irwin and Mikayla created

Release Notes:

- N/A

---------

Co-authored-by: Mikayla <mikayla@zed.dev>

Change summary

crates/collab/src/tests/remote_editing_collaboration_tests.rs |  27 
crates/project/src/buffer_store.rs                            | 966 ++++
crates/project/src/lsp_command.rs                             |  23 
crates/project/src/lsp_store.rs                               |  33 
crates/project/src/project.rs                                 |  18 
crates/remote_server/src/remote_editing_tests.rs              |   1 
6 files changed, 989 insertions(+), 79 deletions(-)

Detailed changes

crates/collab/src/tests/remote_editing_collaboration_tests.rs 🔗

@@ -3,6 +3,7 @@ use call::ActiveCall;
 use fs::{FakeFs, Fs as _};
 use gpui::{Context as _, TestAppContext};
 use language::language_settings::all_language_settings;
+use project::ProjectPath;
 use remote::SshSession;
 use remote_server::HeadlessProject;
 use serde_json::json;
@@ -108,14 +109,36 @@ async fn test_sharing_an_ssh_remote_project(
     });
 
     project_b
-        .update(cx_b, |project, cx| project.save_buffer(buffer_b, cx))
+        .update(cx_b, |project, cx| {
+            project.save_buffer_as(
+                buffer_b.clone(),
+                ProjectPath {
+                    worktree_id: worktree_id.to_owned(),
+                    path: Arc::from(Path::new("src/renamed.rs")),
+                },
+                cx,
+            )
+        })
         .await
         .unwrap();
     assert_eq!(
         remote_fs
-            .load("/code/project1/src/lib.rs".as_ref())
+            .load("/code/project1/src/renamed.rs".as_ref())
             .await
             .unwrap(),
         "fn one() -> usize { 100 }"
     );
+    cx_b.run_until_parked();
+    cx_b.update(|cx| {
+        assert_eq!(
+            buffer_b
+                .read(cx)
+                .file()
+                .unwrap()
+                .path()
+                .to_string_lossy()
+                .to_string(),
+            "src/renamed.rs".to_string()
+        );
+    });
 }

crates/project/src/buffer_store.rs 🔗

@@ -10,7 +10,8 @@ use fs::Fs;
 use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
 use git::blame::Blame;
 use gpui::{
-    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
+    AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription,
+    Task, WeakModel,
 };
 use http_client::Url;
 use language::{
@@ -25,27 +26,72 @@ use smol::channel::Receiver;
 use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant};
 use text::BufferId;
 use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
-use worktree::{
-    File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
-    WorktreeId,
-};
+use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId};
 
-/// A set of open buffers.
-pub struct BufferStore {
-    state: BufferStoreState,
-    downstream_client: Option<(AnyProtoClient, u64)>,
+trait BufferStoreImpl {
+    fn open_buffer(
+        &self,
+        path: Arc<Path>,
+        worktree: Model<Worktree>,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<Model<Buffer>>>;
+
+    fn save_buffer(
+        &self,
+        buffer: Model<Buffer>,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<()>>;
+
+    fn save_buffer_as(
+        &self,
+        buffer: Model<Buffer>,
+        path: ProjectPath,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<()>>;
+
+    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>>;
+
+    fn reload_buffers(
+        &self,
+        buffers: Vec<Model<Buffer>>,
+        push_to_history: bool,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<ProjectTransaction>>;
+
+    fn as_remote(&self) -> Option<Model<RemoteBufferStore>>;
+    fn as_local(&self) -> Option<Model<LocalBufferStore>>;
+}
+
+struct RemoteBufferStore {
+    shared_with_me: HashSet<Model<Buffer>>,
+    upstream_client: AnyProtoClient,
+    project_id: u64,
+    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
+    remote_buffer_listeners:
+        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
     worktree_store: Model<WorktreeStore>,
-    opened_buffers: HashMap<BufferId, OpenBuffer>,
+    buffer_store: WeakModel<BufferStore>,
+}
+
+struct LocalBufferStore {
     local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
     local_buffer_ids_by_entry_id: HashMap<ProjectEntryId, BufferId>,
+    buffer_store: WeakModel<BufferStore>,
+    worktree_store: Model<WorktreeStore>,
+    _subscription: Subscription,
+}
+
+/// A set of open buffers.
+pub struct BufferStore {
+    state: Box<dyn BufferStoreImpl>,
     #[allow(clippy::type_complexity)]
     loading_buffers_by_path: HashMap<
         ProjectPath,
         postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
     >,
-    loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
-    remote_buffer_listeners:
-        HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
+    worktree_store: Model<WorktreeStore>,
+    opened_buffers: HashMap<BufferId, OpenBuffer>,
+    downstream_client: Option<(AnyProtoClient, u64)>,
     shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
 }
 
@@ -63,19 +109,858 @@ pub enum BufferStoreEvent {
     },
 }
 
-enum BufferStoreState {
-    Remote {
-        shared_with_me: HashSet<Model<Buffer>>,
-        upstream_client: AnyProtoClient,
-        project_id: u64,
-    },
-    Local {},
+#[derive(Default, Debug)]
+pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
+
+impl EventEmitter<BufferStoreEvent> for BufferStore {}
+
+impl RemoteBufferStore {
+    pub fn wait_for_remote_buffer(
+        &mut self,
+        id: BufferId,
+        cx: &mut AppContext,
+    ) -> Task<Result<Model<Buffer>>> {
+        let buffer_store = self.buffer_store.clone();
+        let (tx, rx) = oneshot::channel();
+        self.remote_buffer_listeners.entry(id).or_default().push(tx);
+
+        cx.spawn(|cx| async move {
+            if let Some(buffer) = buffer_store
+                .read_with(&cx, |buffer_store, _| buffer_store.get(id))
+                .ok()
+                .flatten()
+            {
+                return Ok(buffer);
+            }
+
+            cx.background_executor()
+                .spawn(async move { rx.await? })
+                .await
+        })
+    }
+
+    fn save_remote_buffer(
+        &self,
+        buffer_handle: Model<Buffer>,
+        new_path: Option<proto::ProjectPath>,
+        cx: &ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let buffer = buffer_handle.read(cx);
+        let buffer_id = buffer.remote_id().into();
+        let version = buffer.version();
+        let rpc = self.upstream_client.clone();
+        let project_id = self.project_id;
+        cx.spawn(move |_, mut cx| async move {
+            let response = rpc
+                .request(proto::SaveBuffer {
+                    project_id,
+                    buffer_id,
+                    new_path,
+                    version: serialize_version(&version),
+                })
+                .await?;
+            let version = deserialize_version(&response.version);
+            let mtime = response.mtime.map(|mtime| mtime.into());
+
+            buffer_handle.update(&mut cx, |buffer, cx| {
+                buffer.did_save(version.clone(), mtime, cx);
+            })?;
+
+            Ok(())
+        })
+    }
+
+    pub fn handle_create_buffer_for_peer(
+        &mut self,
+        envelope: TypedEnvelope<proto::CreateBufferForPeer>,
+        replica_id: u16,
+        capability: Capability,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<Option<Model<Buffer>>> {
+        match envelope
+            .payload
+            .variant
+            .ok_or_else(|| anyhow!("missing variant"))?
+        {
+            proto::create_buffer_for_peer::Variant::State(mut state) => {
+                let buffer_id = BufferId::new(state.id)?;
+
+                let buffer_result = maybe!({
+                    let mut buffer_file = None;
+                    if let Some(file) = state.file.take() {
+                        let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
+                        let worktree = self
+                            .worktree_store
+                            .read(cx)
+                            .worktree_for_id(worktree_id, cx)
+                            .ok_or_else(|| {
+                                anyhow!("no worktree found for id {}", file.worktree_id)
+                            })?;
+                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
+                            as Arc<dyn language::File>);
+                    }
+                    Buffer::from_proto(replica_id, capability, state, buffer_file)
+                });
+
+                match buffer_result {
+                    Ok(buffer) => {
+                        let buffer = cx.new_model(|_| buffer);
+                        self.loading_remote_buffers_by_id.insert(buffer_id, buffer);
+                    }
+                    Err(error) => {
+                        if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
+                            for listener in listeners {
+                                listener.send(Err(anyhow!(error.cloned()))).ok();
+                            }
+                        }
+                    }
+                }
+            }
+            proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
+                let buffer_id = BufferId::new(chunk.buffer_id)?;
+                let buffer = self
+                    .loading_remote_buffers_by_id
+                    .get(&buffer_id)
+                    .cloned()
+                    .ok_or_else(|| {
+                        anyhow!(
+                            "received chunk for buffer {} without initial state",
+                            chunk.buffer_id
+                        )
+                    })?;
+
+                let result = maybe!({
+                    let operations = chunk
+                        .operations
+                        .into_iter()
+                        .map(language::proto::deserialize_operation)
+                        .collect::<Result<Vec<_>>>()?;
+                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx));
+                    anyhow::Ok(())
+                });
+
+                if let Err(error) = result {
+                    self.loading_remote_buffers_by_id.remove(&buffer_id);
+                    if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) {
+                        for listener in listeners {
+                            listener.send(Err(error.cloned())).ok();
+                        }
+                    }
+                } else if chunk.is_last {
+                    self.loading_remote_buffers_by_id.remove(&buffer_id);
+                    if self.upstream_client.is_via_collab() {
+                        // retain buffers sent by peers to avoid races.
+                        self.shared_with_me.insert(buffer.clone());
+                    }
+
+                    if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) {
+                        for sender in senders {
+                            sender.send(Ok(buffer.clone())).ok();
+                        }
+                    }
+                    return Ok(Some(buffer));
+                }
+            }
+        }
+        return Ok(None);
+    }
+
+    pub fn incomplete_buffer_ids(&self) -> Vec<BufferId> {
+        self.loading_remote_buffers_by_id
+            .keys()
+            .copied()
+            .collect::<Vec<_>>()
+    }
+
+    pub fn deserialize_project_transaction(
+        &self,
+        message: proto::ProjectTransaction,
+        push_to_history: bool,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<ProjectTransaction>> {
+        cx.spawn(|this, mut cx| async move {
+            let mut project_transaction = ProjectTransaction::default();
+            for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions)
+            {
+                let buffer_id = BufferId::new(buffer_id)?;
+                let buffer = this
+                    .update(&mut cx, |this, cx| {
+                        this.wait_for_remote_buffer(buffer_id, cx)
+                    })?
+                    .await?;
+                let transaction = language::proto::deserialize_transaction(transaction)?;
+                project_transaction.0.insert(buffer, transaction);
+            }
+
+            for (buffer, transaction) in &project_transaction.0 {
+                buffer
+                    .update(&mut cx, |buffer, _| {
+                        buffer.wait_for_edits(transaction.edit_ids.iter().copied())
+                    })?
+                    .await?;
+
+                if push_to_history {
+                    buffer.update(&mut cx, |buffer, _| {
+                        buffer.push_transaction(transaction.clone(), Instant::now());
+                    })?;
+                }
+            }
+
+            Ok(project_transaction)
+        })
+    }
 }
 
-#[derive(Default, Debug)]
-pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
+impl BufferStoreImpl for Model<RemoteBufferStore> {
+    fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
+        Some(self.clone())
+    }
+
+    fn as_local(&self) -> Option<Model<LocalBufferStore>> {
+        None
+    }
+
+    fn save_buffer(
+        &self,
+        buffer: Model<Buffer>,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<()>> {
+        self.update(cx, |this, cx| {
+            this.save_remote_buffer(buffer.clone(), None, cx)
+        })
+    }
+    fn save_buffer_as(
+        &self,
+        buffer: Model<Buffer>,
+        path: ProjectPath,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<()>> {
+        self.update(cx, |this, cx| {
+            this.save_remote_buffer(buffer, Some(path.to_proto()), cx)
+        })
+    }
+
+    fn open_buffer(
+        &self,
+        path: Arc<Path>,
+        worktree: Model<Worktree>,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<Model<Buffer>>> {
+        self.update(cx, |this, cx| {
+            let worktree_id = worktree.read(cx).id().to_proto();
+            let project_id = this.project_id;
+            let client = this.upstream_client.clone();
+            let path_string = path.clone().to_string_lossy().to_string();
+            cx.spawn(move |this, mut cx| async move {
+                let response = client
+                    .request(proto::OpenBufferByPath {
+                        project_id,
+                        worktree_id,
+                        path: path_string,
+                    })
+                    .await?;
+                let buffer_id = BufferId::new(response.buffer_id)?;
+
+                let buffer = this
+                    .update(&mut cx, {
+                        |this, cx| this.wait_for_remote_buffer(buffer_id, cx)
+                    })?
+                    .await?;
+
+                Ok(buffer)
+            })
+        })
+    }
+
+    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
+        self.update(cx, |this, cx| {
+            let create = this.upstream_client.request(proto::OpenNewBuffer {
+                project_id: this.project_id,
+            });
+            cx.spawn(|this, mut cx| async move {
+                let response = create.await?;
+                let buffer_id = BufferId::new(response.buffer_id)?;
+
+                this.update(&mut cx, |this, cx| {
+                    this.wait_for_remote_buffer(buffer_id, cx)
+                })?
+                .await
+            })
+        })
+    }
+
+    fn reload_buffers(
+        &self,
+        buffers: Vec<Model<Buffer>>,
+        push_to_history: bool,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<ProjectTransaction>> {
+        self.update(cx, |this, cx| {
+            let request = this.upstream_client.request(proto::ReloadBuffers {
+                project_id: this.project_id,
+                buffer_ids: buffers
+                    .iter()
+                    .map(|buffer| buffer.read(cx).remote_id().to_proto())
+                    .collect(),
+            });
+
+            cx.spawn(|this, mut cx| async move {
+                let response = request
+                    .await?
+                    .transaction
+                    .ok_or_else(|| anyhow!("missing transaction"))?;
+                this.update(&mut cx, |this, cx| {
+                    this.deserialize_project_transaction(response, push_to_history, cx)
+                })?
+                .await
+            })
+        })
+    }
+}
+
+impl LocalBufferStore {
+    fn save_local_buffer(
+        &self,
+        buffer_handle: Model<Buffer>,
+        worktree: Model<Worktree>,
+        path: Arc<Path>,
+        mut has_changed_file: bool,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let buffer = buffer_handle.read(cx);
+
+        let text = buffer.as_rope().clone();
+        let line_ending = buffer.line_ending();
+        let version = buffer.version();
+        let buffer_id = buffer.remote_id();
+        if buffer.file().is_some_and(|file| !file.is_created()) {
+            has_changed_file = true;
+        }
+
+        let save = worktree.update(cx, |worktree, cx| {
+            worktree.write_file(path.as_ref(), text, line_ending, cx)
+        });
+
+        cx.spawn(move |this, mut cx| async move {
+            let new_file = save.await?;
+            let mtime = new_file.mtime;
+            this.update(&mut cx, |this, cx| {
+                if let Some((downstream_client, project_id)) = this.downstream_client(cx) {
+                    if has_changed_file {
+                        downstream_client
+                            .send(proto::UpdateBufferFile {
+                                project_id,
+                                buffer_id: buffer_id.to_proto(),
+                                file: Some(language::File::to_proto(&*new_file, cx)),
+                            })
+                            .log_err();
+                    }
+                    downstream_client
+                        .send(proto::BufferSaved {
+                            project_id,
+                            buffer_id: buffer_id.to_proto(),
+                            version: serialize_version(&version),
+                            mtime: mtime.map(|time| time.into()),
+                        })
+                        .log_err();
+                }
+            })?;
+            buffer_handle.update(&mut cx, |buffer, cx| {
+                if has_changed_file {
+                    buffer.file_updated(new_file, cx);
+                }
+                buffer.did_save(version.clone(), mtime, cx);
+            })
+        })
+    }
+
+    fn subscribe_to_worktree(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
+        cx.subscribe(worktree, |this, worktree, event, cx| {
+            if worktree.read(cx).is_local() {
+                match event {
+                    worktree::Event::UpdatedEntries(changes) => {
+                        this.local_worktree_entries_changed(&worktree, changes, cx);
+                    }
+                    worktree::Event::UpdatedGitRepositories(updated_repos) => {
+                        this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx)
+                    }
+                    _ => {}
+                }
+            }
+        })
+        .detach();
+    }
+
+    fn local_worktree_entries_changed(
+        &mut self,
+        worktree_handle: &Model<Worktree>,
+        changes: &[(Arc<Path>, ProjectEntryId, PathChange)],
+        cx: &mut ModelContext<Self>,
+    ) {
+        let snapshot = worktree_handle.read(cx).snapshot();
+        for (path, entry_id, _) in changes {
+            self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx);
+        }
+    }
+
+    fn local_worktree_git_repos_changed(
+        &mut self,
+        worktree_handle: Model<Worktree>,
+        changed_repos: &UpdatedGitRepositoriesSet,
+        cx: &mut ModelContext<Self>,
+    ) {
+        debug_assert!(worktree_handle.read(cx).is_local());
+        let Some(buffer_store) = self.buffer_store.upgrade() else {
+            return;
+        };
+
+        // Identify the loading buffers whose containing repository that has changed.
+        let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| {
+            let future_buffers = buffer_store
+                .loading_buffers()
+                .filter_map(|(project_path, receiver)| {
+                    if project_path.worktree_id != worktree_handle.read(cx).id() {
+                        return None;
+                    }
+                    let path = &project_path.path;
+                    changed_repos
+                        .iter()
+                        .find(|(work_dir, _)| path.starts_with(work_dir))?;
+                    let path = path.clone();
+                    Some(async move {
+                        BufferStore::wait_for_loading_buffer(receiver)
+                            .await
+                            .ok()
+                            .map(|buffer| (buffer, path))
+                    })
+                })
+                .collect::<FuturesUnordered<_>>();
+
+            // Identify the current buffers whose containing repository has changed.
+            let current_buffers = buffer_store
+                .buffers()
+                .filter_map(|buffer| {
+                    let file = File::from_dyn(buffer.read(cx).file())?;
+                    if file.worktree != worktree_handle {
+                        return None;
+                    }
+                    changed_repos
+                        .iter()
+                        .find(|(work_dir, _)| file.path.starts_with(work_dir))?;
+                    Some((buffer, file.path.clone()))
+                })
+                .collect::<Vec<_>>();
+            (future_buffers, current_buffers)
+        });
+
+        if future_buffers.len() + current_buffers.len() == 0 {
+            return;
+        }
+
+        cx.spawn(move |this, mut cx| async move {
+            // Wait for all of the buffers to load.
+            let future_buffers = future_buffers.collect::<Vec<_>>().await;
+
+            // Reload the diff base for every buffer whose containing git repository has changed.
+            let snapshot =
+                worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?;
+            let diff_bases_by_buffer = cx
+                .background_executor()
+                .spawn(async move {
+                    let mut diff_base_tasks = future_buffers
+                        .into_iter()
+                        .flatten()
+                        .chain(current_buffers)
+                        .filter_map(|(buffer, path)| {
+                            let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?;
+                            let relative_path = repo_entry.relativize(&snapshot, &path).ok()?;
+                            Some(async move {
+                                let base_text =
+                                    local_repo_entry.repo().load_index_text(&relative_path);
+                                Some((buffer, base_text))
+                            })
+                        })
+                        .collect::<FuturesUnordered<_>>();
+
+                    let mut diff_bases = Vec::with_capacity(diff_base_tasks.len());
+                    while let Some(diff_base) = diff_base_tasks.next().await {
+                        if let Some(diff_base) = diff_base {
+                            diff_bases.push(diff_base);
+                        }
+                    }
+                    diff_bases
+                })
+                .await;
+
+            this.update(&mut cx, |this, cx| {
+                // Assign the new diff bases on all of the buffers.
+                for (buffer, diff_base) in diff_bases_by_buffer {
+                    let buffer_id = buffer.update(cx, |buffer, cx| {
+                        buffer.set_diff_base(diff_base.clone(), cx);
+                        buffer.remote_id().to_proto()
+                    });
+                    if let Some((client, project_id)) = &this.downstream_client(cx) {
+                        client
+                            .send(proto::UpdateDiffBase {
+                                project_id: *project_id,
+                                buffer_id,
+                                diff_base,
+                            })
+                            .log_err();
+                    }
+                }
+            })
+        })
+        .detach_and_log_err(cx);
+    }
+
+    fn local_worktree_entry_changed(
+        &mut self,
+        entry_id: ProjectEntryId,
+        path: &Arc<Path>,
+        worktree: &Model<worktree::Worktree>,
+        snapshot: &worktree::Snapshot,
+        cx: &mut ModelContext<Self>,
+    ) -> Option<()> {
+        let project_path = ProjectPath {
+            worktree_id: snapshot.id(),
+            path: path.clone(),
+        };
+        let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) {
+            Some(&buffer_id) => buffer_id,
+            None => self.local_buffer_ids_by_path.get(&project_path).copied()?,
+        };
+        let buffer = self
+            .buffer_store
+            .update(cx, |buffer_store, _| {
+                if let Some(buffer) = buffer_store.get(buffer_id) {
+                    Some(buffer)
+                } else {
+                    buffer_store.opened_buffers.remove(&buffer_id);
+                    None
+                }
+            })
+            .ok()
+            .flatten();
+        let buffer = if let Some(buffer) = buffer {
+            buffer
+        } else {
+            self.local_buffer_ids_by_path.remove(&project_path);
+            self.local_buffer_ids_by_entry_id.remove(&entry_id);
+            return None;
+        };
+
+        let events = buffer.update(cx, |buffer, cx| {
+            let file = buffer.file()?;
+            let old_file = File::from_dyn(Some(file))?;
+            if old_file.worktree != *worktree {
+                return None;
+            }
+
+            let new_file = if let Some(entry) = old_file
+                .entry_id
+                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
+            {
+                File {
+                    is_local: true,
+                    entry_id: Some(entry.id),
+                    mtime: entry.mtime,
+                    path: entry.path.clone(),
+                    worktree: worktree.clone(),
+                    is_deleted: false,
+                    is_private: entry.is_private,
+                }
+            } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) {
+                File {
+                    is_local: true,
+                    entry_id: Some(entry.id),
+                    mtime: entry.mtime,
+                    path: entry.path.clone(),
+                    worktree: worktree.clone(),
+                    is_deleted: false,
+                    is_private: entry.is_private,
+                }
+            } else {
+                File {
+                    is_local: true,
+                    entry_id: old_file.entry_id,
+                    path: old_file.path.clone(),
+                    mtime: old_file.mtime,
+                    worktree: worktree.clone(),
+                    is_deleted: true,
+                    is_private: old_file.is_private,
+                }
+            };
+
+            if new_file == *old_file {
+                return None;
+            }
+
+            let mut events = Vec::new();
+            if new_file.path != old_file.path {
+                self.local_buffer_ids_by_path.remove(&ProjectPath {
+                    path: old_file.path.clone(),
+                    worktree_id: old_file.worktree_id(cx),
+                });
+                self.local_buffer_ids_by_path.insert(
+                    ProjectPath {
+                        worktree_id: new_file.worktree_id(cx),
+                        path: new_file.path.clone(),
+                    },
+                    buffer_id,
+                );
+                events.push(BufferStoreEvent::BufferChangedFilePath {
+                    buffer: cx.handle(),
+                    old_file: buffer.file().cloned(),
+                });
+            }
+
+            if new_file.entry_id != old_file.entry_id {
+                if let Some(entry_id) = old_file.entry_id {
+                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
+                }
+                if let Some(entry_id) = new_file.entry_id {
+                    self.local_buffer_ids_by_entry_id
+                        .insert(entry_id, buffer_id);
+                }
+            }
+
+            if let Some((client, project_id)) = &self.downstream_client(cx) {
+                client
+                    .send(proto::UpdateBufferFile {
+                        project_id: *project_id,
+                        buffer_id: buffer_id.to_proto(),
+                        file: Some(new_file.to_proto(cx)),
+                    })
+                    .ok();
+            }
+
+            buffer.file_updated(Arc::new(new_file), cx);
+            Some(events)
+        })?;
+        self.buffer_store
+            .update(cx, |_buffer_store, cx| {
+                for event in events {
+                    cx.emit(event);
+                }
+            })
+            .log_err()?;
+
+        None
+    }
+
+    fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> {
+        self.buffer_store
+            .upgrade()?
+            .read(cx)
+            .downstream_client
+            .clone()
+    }
+
+    fn buffer_changed_file(&mut self, buffer: Model<Buffer>, cx: &mut AppContext) -> Option<()> {
+        let file = File::from_dyn(buffer.read(cx).file())?;
+
+        let remote_id = buffer.read(cx).remote_id();
+        if let Some(entry_id) = file.entry_id {
+            match self.local_buffer_ids_by_entry_id.get(&entry_id) {
+                Some(_) => {
+                    return None;
+                }
+                None => {
+                    self.local_buffer_ids_by_entry_id
+                        .insert(entry_id, remote_id);
+                }
+            }
+        };
+        self.local_buffer_ids_by_path.insert(
+            ProjectPath {
+                worktree_id: file.worktree_id(cx),
+                path: file.path.clone(),
+            },
+            remote_id,
+        );
+
+        Some(())
+    }
+}
+
+impl BufferStoreImpl for Model<LocalBufferStore> {
+    fn as_remote(&self) -> Option<Model<RemoteBufferStore>> {
+        None
+    }
+
+    fn as_local(&self) -> Option<Model<LocalBufferStore>> {
+        Some(self.clone())
+    }
+
+    fn save_buffer(
+        &self,
+        buffer: Model<Buffer>,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<()>> {
+        self.update(cx, |this, cx| {
+            let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
+                return Task::ready(Err(anyhow!("buffer doesn't have a file")));
+            };
+            let worktree = file.worktree.clone();
+            this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx)
+        })
+    }
+
+    fn save_buffer_as(
+        &self,
+        buffer: Model<Buffer>,
+        path: ProjectPath,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<()>> {
+        self.update(cx, |this, cx| {
+            let Some(worktree) = this
+                .worktree_store
+                .read(cx)
+                .worktree_for_id(path.worktree_id, cx)
+            else {
+                return Task::ready(Err(anyhow!("no such worktree")));
+            };
+            this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx)
+        })
+    }
+
+    fn open_buffer(
+        &self,
+        path: Arc<Path>,
+        worktree: Model<Worktree>,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<Model<Buffer>>> {
+        let buffer_store = cx.weak_model();
+        self.update(cx, |_, cx| {
+            let load_buffer = worktree.update(cx, |worktree, cx| {
+                let load_file = worktree.load_file(path.as_ref(), cx);
+                let reservation = cx.reserve_model();
+                let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64());
+                cx.spawn(move |_, mut cx| async move {
+                    let loaded = load_file.await?;
+                    let text_buffer = cx
+                        .background_executor()
+                        .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
+                        .await;
+                    cx.insert_model(reservation, |_| {
+                        Buffer::build(
+                            text_buffer,
+                            loaded.diff_base,
+                            Some(loaded.file),
+                            Capability::ReadWrite,
+                        )
+                    })
+                })
+            });
+
+            cx.spawn(move |this, mut cx| async move {
+                let buffer = match load_buffer.await {
+                    Ok(buffer) => Ok(buffer),
+                    Err(error) if is_not_found_error(&error) => cx.new_model(|cx| {
+                        let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
+                        let text_buffer = text::Buffer::new(0, buffer_id, "".into());
+                        Buffer::build(
+                            text_buffer,
+                            None,
+                            Some(Arc::new(File {
+                                worktree,
+                                path,
+                                mtime: None,
+                                entry_id: None,
+                                is_local: true,
+                                is_deleted: false,
+                                is_private: false,
+                            })),
+                            Capability::ReadWrite,
+                        )
+                    }),
+                    Err(e) => Err(e),
+                }?;
+                this.update(&mut cx, |this, cx| {
+                    buffer_store.update(cx, |buffer_store, cx| {
+                        buffer_store.add_buffer(buffer.clone(), cx)
+                    })??;
+                    let buffer_id = buffer.read(cx).remote_id();
+                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+                        this.local_buffer_ids_by_path.insert(
+                            ProjectPath {
+                                worktree_id: file.worktree_id(cx),
+                                path: file.path.clone(),
+                            },
+                            buffer_id,
+                        );
+
+                        if let Some(entry_id) = file.entry_id {
+                            this.local_buffer_ids_by_entry_id
+                                .insert(entry_id, buffer_id);
+                        }
+                    }
+
+                    anyhow::Ok(())
+                })??;
+
+                Ok(buffer)
+            })
+        })
+    }
+
+    fn create_buffer(&self, cx: &mut ModelContext<BufferStore>) -> Task<Result<Model<Buffer>>> {
+        let handle = self.clone();
+        cx.spawn(|buffer_store, mut cx| async move {
+            let buffer = cx.new_model(|cx| {
+                Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx)
+            })?;
+            buffer_store.update(&mut cx, |buffer_store, cx| {
+                buffer_store.add_buffer(buffer.clone(), cx).log_err();
+                let buffer_id = buffer.read(cx).remote_id();
+                handle.update(cx, |this, cx| {
+                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+                        this.local_buffer_ids_by_path.insert(
+                            ProjectPath {
+                                worktree_id: file.worktree_id(cx),
+                                path: file.path.clone(),
+                            },
+                            buffer_id,
+                        );
+
+                        if let Some(entry_id) = file.entry_id {
+                            this.local_buffer_ids_by_entry_id
+                                .insert(entry_id, buffer_id);
+                        }
+                    }
+                });
+            })?;
+            Ok(buffer)
+        })
+    }
+
+    fn reload_buffers(
+        &self,
+        buffers: Vec<Model<Buffer>>,
+        push_to_history: bool,
+        cx: &mut ModelContext<BufferStore>,
+    ) -> Task<Result<ProjectTransaction>> {
+        cx.spawn(move |_, mut cx| async move {
+            let mut project_transaction = ProjectTransaction::default();
+            for buffer in buffers {
+                let transaction = buffer
+                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
+                    .await?;
+                buffer.update(&mut cx, |buffer, cx| {
+                    if let Some(transaction) = transaction {
+                        if !push_to_history {
+                            buffer.forget_transaction(transaction.id);
+                        }
+                        project_transaction.0.insert(cx.handle(), transaction);
+                    }
+                })?;
+            }
 
-impl EventEmitter<BufferStoreEvent> for BufferStore {}
+            Ok(project_transaction)
+        })
+    }
+}
 
 impl BufferStore {
     pub fn init(client: &AnyProtoClient) {
@@ -90,24 +975,31 @@ impl BufferStore {
 
     /// Creates a buffer store, optionally retaining its buffers.
     pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
-        cx.subscribe(&worktree_store, |this, _, event, cx| {
-            if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
-                this.subscribe_to_worktree(worktree, cx);
-            }
-        })
-        .detach();
-
+        let this = cx.weak_model();
         Self {
-            state: BufferStoreState::Local {},
+            state: Box::new(cx.new_model(|cx| {
+                let subscription = cx.subscribe(
+                    &worktree_store,
+                    |this: &mut LocalBufferStore, _, event, cx| {
+                        if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
+                            this.subscribe_to_worktree(worktree, cx);
+                        }
+                    },
+                );
+
+                LocalBufferStore {
+                    local_buffer_ids_by_path: Default::default(),
+                    local_buffer_ids_by_entry_id: Default::default(),
+                    buffer_store: this,
+                    worktree_store: worktree_store.clone(),
+                    _subscription: subscription,
+                }
+            })),
             downstream_client: None,
-            worktree_store,
             opened_buffers: Default::default(),
-            remote_buffer_listeners: Default::default(),
-            loading_remote_buffers_by_id: Default::default(),
-            local_buffer_ids_by_path: Default::default(),
-            local_buffer_ids_by_entry_id: Default::default(),
-            loading_buffers_by_path: Default::default(),
             shared_buffers: Default::default(),
+            loading_buffers_by_path: Default::default(),
+            worktree_store,
         }
     }
 

crates/project/src/lsp_command.rs 🔗

@@ -1,10 +1,9 @@
 mod signature_help;
 
 use crate::{
-    buffer_store::BufferStore, lsp_store::LspStore, CodeAction, CoreCompletion, DocumentHighlight,
-    Hover, HoverBlock, HoverBlockKind, InlayHint, InlayHintLabel, InlayHintLabelPart,
-    InlayHintLabelPartTooltip, InlayHintTooltip, Location, LocationLink, MarkupContent,
-    ProjectTransaction, ResolveState,
+    lsp_store::LspStore, CodeAction, CoreCompletion, DocumentHighlight, Hover, HoverBlock,
+    HoverBlockKind, InlayHint, InlayHintLabel, InlayHintLabelPart, InlayHintLabelPartTooltip,
+    InlayHintTooltip, Location, LocationLink, MarkupContent, ProjectTransaction, ResolveState,
 };
 use anyhow::{anyhow, Context, Result};
 use async_trait::async_trait;
@@ -417,18 +416,18 @@ impl LspCommand for PerformRename {
         message: proto::PerformRenameResponse,
         lsp_store: Model<LspStore>,
         _: Model<Buffer>,
-        cx: AsyncAppContext,
+        mut cx: AsyncAppContext,
     ) -> Result<ProjectTransaction> {
         let message = message
             .transaction
             .ok_or_else(|| anyhow!("missing transaction"))?;
-        BufferStore::deserialize_project_transaction(
-            lsp_store.read_with(&cx, |lsp_store, _| lsp_store.buffer_store().downgrade())?,
-            message,
-            self.push_to_history,
-            cx,
-        )
-        .await
+        lsp_store
+            .update(&mut cx, |lsp_store, cx| {
+                lsp_store.buffer_store().update(cx, |buffer_store, cx| {
+                    buffer_store.deserialize_project_transaction(message, self.push_to_history, cx)
+                })
+            })?
+            .await
     }
 
     fn buffer_id_from_proto(message: &proto::PerformRename) -> Result<BufferId> {

crates/project/src/lsp_store.rs 🔗

@@ -1601,19 +1601,19 @@ impl LspStore {
                 buffer_id: buffer_handle.read(cx).remote_id().into(),
                 action: Some(Self::serialize_code_action(&action)),
             };
-            cx.spawn(move |this, cx| async move {
+            let buffer_store = self.buffer_store();
+            cx.spawn(move |_, mut cx| async move {
                 let response = upstream_client
                     .request(request)
                     .await?
                     .transaction
                     .ok_or_else(|| anyhow!("missing transaction"))?;
-                BufferStore::deserialize_project_transaction(
-                    this.read_with(&cx, |this, _| this.buffer_store.downgrade())?,
-                    response,
-                    push_to_history,
-                    cx,
-                )
-                .await
+
+                buffer_store
+                    .update(&mut cx, |buffer_store, cx| {
+                        buffer_store.deserialize_project_transaction(response, push_to_history, cx)
+                    })?
+                    .await
             })
         } else {
             let buffer = buffer_handle.read(cx);
@@ -5062,6 +5062,7 @@ impl LspStore {
                 .spawn(this.languages.language_for_name(language_name.0.as_ref()))
                 .detach();
 
+            // host
             let adapter = this.languages.get_or_register_lsp_adapter(
                 language_name.clone(),
                 server_name.clone(),
@@ -5259,7 +5260,8 @@ impl LspStore {
                 result
             })
         } else if let Some((client, project_id)) = self.upstream_client() {
-            cx.spawn(move |this, mut cx| async move {
+            let buffer_store = self.buffer_store();
+            cx.spawn(move |_, mut cx| async move {
                 let response = client
                     .request(proto::FormatBuffers {
                         project_id,
@@ -5274,13 +5276,12 @@ impl LspStore {
                     .await?
                     .transaction
                     .ok_or_else(|| anyhow!("missing transaction"))?;
-                BufferStore::deserialize_project_transaction(
-                    this.read_with(&cx, |this, _| this.buffer_store.downgrade())?,
-                    response,
-                    push_to_history,
-                    cx,
-                )
-                .await
+
+                buffer_store
+                    .update(&mut cx, |buffer_store, cx| {
+                        buffer_store.deserialize_project_transaction(response, push_to_history, cx)
+                    })?
+                    .await
             })
         } else {
             Task::ready(Ok(ProjectTransaction::default()))

crates/project/src/project.rs 🔗

@@ -1667,16 +1667,8 @@ impl Project {
     }
 
     pub fn create_buffer(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<Model<Buffer>>> {
-        self.buffer_store.update(cx, |buffer_store, cx| {
-            buffer_store.create_buffer(
-                if self.is_via_collab() {
-                    Some((self.client.clone().into(), self.remote_id().unwrap()))
-                } else {
-                    None
-                },
-                cx,
-            )
-        })
+        self.buffer_store
+            .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))
     }
 
     pub fn create_local_buffer(
@@ -1685,7 +1677,7 @@ impl Project {
         language: Option<Arc<Language>>,
         cx: &mut ModelContext<Self>,
     ) -> Model<Buffer> {
-        if self.is_via_collab() {
+        if self.is_via_collab() || self.is_via_ssh() {
             panic!("called create_local_buffer on a remote project")
         }
         self.buffer_store.update(cx, |buffer_store, cx| {
@@ -3770,7 +3762,9 @@ impl Project {
         envelope: TypedEnvelope<proto::OpenNewBuffer>,
         mut cx: AsyncAppContext,
     ) -> Result<proto::OpenBufferResponse> {
-        let buffer = this.update(&mut cx, |this, cx| this.create_local_buffer("", None, cx))?;
+        let buffer = this
+            .update(&mut cx, |this, cx| this.create_buffer(cx))?
+            .await?;
         let peer_id = envelope.original_sender_id()?;
 
         Project::respond_to_open_buffer_request(this, buffer, peer_id, &mut cx)

crates/remote_server/src/remote_editing_tests.rs 🔗

@@ -56,6 +56,7 @@ async fn test_basic_remote_editing(cx: &mut TestAppContext, server_cx: &mut Test
         })
         .await
         .unwrap();
+
     buffer.update(cx, |buffer, cx| {
         assert_eq!(buffer.text(), "fn one() -> usize { 1 }");
         assert_eq!(