Make git blame for SSH remote projects (#15106)

Max Brunsfeld created

This also refactors the BufferStore + WorktreeStore interfaces to make
them cleaner, more fully encapsulating the RPC aspects of their
functionality.

Release Notes:

- N/A

Change summary

crates/client/src/client.rs                  |   2 
crates/project/src/buffer_store.rs           | 462 ++++++++++++++++----
crates/project/src/project.rs                | 490 +++------------------
crates/project/src/project_tests.rs          |   1 
crates/proto/src/proto.rs                    |   4 
crates/remote_server/src/headless_project.rs |  58 --
6 files changed, 451 insertions(+), 566 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -1410,7 +1410,7 @@ impl Client {
         self.peer.send(self.connection_id()?, message)
     }
 
-    fn send_dynamic(&self, envelope: proto::Envelope) -> Result<()> {
+    pub fn send_dynamic(&self, envelope: proto::Envelope) -> Result<()> {
         let connection_id = self.connection_id()?;
         self.peer.send_dynamic(connection_id, envelope)
     }

crates/project/src/buffer_store.rs 🔗

@@ -1,31 +1,34 @@
 use crate::{
     worktree_store::{WorktreeStore, WorktreeStoreEvent},
-    ProjectPath,
+    NoRepositoryError, ProjectPath,
 };
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context as _, Result};
 use collections::{hash_map, HashMap};
 use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
+use git::blame::Blame;
 use gpui::{
     AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
 };
+use http_client::Url;
 use language::{
     proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
-    Buffer, Capability, Event as BufferEvent, Language, Operation,
+    Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
 };
 use rpc::{
-    proto::{self, AnyProtoClient, PeerId},
+    proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
     ErrorExt as _, TypedEnvelope,
 };
-use std::{io, path::Path, sync::Arc};
+use std::{io, path::Path, str::FromStr as _, sync::Arc};
 use text::BufferId;
 use util::{debug_panic, maybe, ResultExt as _};
 use worktree::{
     File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
+    WorktreeId,
 };
 
 /// A set of open buffers.
 pub struct BufferStore {
-    retain_buffers: bool,
+    remote_id: Option<u64>,
     #[allow(unused)]
     worktree_store: Model<WorktreeStore>,
     opened_buffers: HashMap<BufferId, OpenBuffer>,
@@ -51,19 +54,9 @@ pub enum BufferStoreEvent {
     BufferAdded(Model<Buffer>),
     BufferChangedFilePath {
         buffer: Model<Buffer>,
-        old_file: Option<Arc<File>>,
-    },
-    BufferSaved {
-        buffer: Model<Buffer>,
-        has_changed_file: bool,
-        saved_version: clock::Global,
-    },
-    LocalBufferUpdated {
-        buffer: Model<Buffer>,
-    },
-    DiffBaseUpdated {
-        buffer: Model<Buffer>,
+        old_file: Option<Arc<dyn language::File>>,
     },
+    MessageToReplicas(Box<proto::Envelope>),
 }
 
 impl EventEmitter<BufferStoreEvent> for BufferStore {}
@@ -77,7 +70,7 @@ impl BufferStore {
     /// weak handles.
     pub fn new(
         worktree_store: Model<WorktreeStore>,
-        retain_buffers: bool,
+        remote_id: Option<u64>,
         cx: &mut ModelContext<Self>,
     ) -> Self {
         cx.subscribe(&worktree_store, |this, _, event, cx| match event {
@@ -89,7 +82,7 @@ impl BufferStore {
         .detach();
 
         Self {
-            retain_buffers,
+            remote_id,
             worktree_store,
             opened_buffers: Default::default(),
             remote_buffer_listeners: Default::default(),
@@ -272,13 +265,23 @@ impl BufferStore {
                 })
                 .await;
 
-            this.update(&mut cx, |_, cx| {
+            this.update(&mut cx, |this, cx| {
                 // Assign the new diff bases on all of the buffers.
                 for (buffer, diff_base) in diff_bases_by_buffer {
-                    buffer.update(cx, |buffer, cx| {
+                    let buffer_id = buffer.update(cx, |buffer, cx| {
                         buffer.set_diff_base(diff_base.clone(), cx);
+                        buffer.remote_id().to_proto()
                     });
-                    cx.emit(BufferStoreEvent::DiffBaseUpdated { buffer })
+                    if let Some(project_id) = this.remote_id {
+                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
+                            proto::UpdateDiffBase {
+                                project_id,
+                                buffer_id,
+                                diff_base,
+                            }
+                            .into_envelope(0, None, None),
+                        )))
+                    }
                 }
             })
         })
@@ -433,9 +436,7 @@ impl BufferStore {
             return Task::ready(Err(anyhow!("no such worktree")));
         };
 
-        let old_file = File::from_dyn(buffer.read(cx).file())
-            .cloned()
-            .map(Arc::new);
+        let old_file = buffer.read(cx).file().cloned();
 
         let task = match worktree.read(cx) {
             Worktree::Local(_) => {
@@ -465,6 +466,7 @@ impl BufferStore {
         let text = buffer.as_rope().clone();
         let line_ending = buffer.line_ending();
         let version = buffer.version();
+        let buffer_id = buffer.remote_id();
         if buffer.file().is_some_and(|file| !file.is_created()) {
             has_changed_file = true;
         }
@@ -476,20 +478,35 @@ impl BufferStore {
         cx.spawn(move |this, mut cx| async move {
             let new_file = save.await?;
             let mtime = new_file.mtime;
+            this.update(&mut cx, |this, cx| {
+                if let Some(project_id) = this.remote_id {
+                    if has_changed_file {
+                        cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
+                            proto::UpdateBufferFile {
+                                project_id,
+                                buffer_id: buffer_id.to_proto(),
+                                file: Some(language::File::to_proto(&*new_file, cx)),
+                            }
+                            .into_envelope(0, None, None),
+                        )));
+                    }
+                    cx.emit(BufferStoreEvent::MessageToReplicas(Box::new(
+                        proto::BufferSaved {
+                            project_id,
+                            buffer_id: buffer_id.to_proto(),
+                            version: serialize_version(&version),
+                            mtime: mtime.map(|time| time.into()),
+                        }
+                        .into_envelope(0, None, None),
+                    )));
+                }
+            })?;
             buffer_handle.update(&mut cx, |buffer, cx| {
                 if has_changed_file {
                     buffer.file_updated(new_file, cx);
                 }
                 buffer.did_save(version.clone(), mtime, cx);
-            })?;
-            this.update(&mut cx, |_, cx| {
-                cx.emit(BufferStoreEvent::BufferSaved {
-                    buffer: buffer_handle,
-                    has_changed_file,
-                    saved_version: version,
-                })
-            })?;
-            Ok(())
+            })
         })
     }
 
@@ -525,10 +542,69 @@ impl BufferStore {
         })
     }
 
+    pub fn blame_buffer(
+        &self,
+        buffer: &Model<Buffer>,
+        version: Option<clock::Global>,
+        cx: &AppContext,
+    ) -> Task<Result<Blame>> {
+        let buffer = buffer.read(cx);
+        let Some(file) = File::from_dyn(buffer.file()) else {
+            return Task::ready(Err(anyhow!("buffer has no file")));
+        };
+
+        match file.worktree.clone().read(cx) {
+            Worktree::Local(worktree) => {
+                let worktree = worktree.snapshot();
+                let blame_params = maybe!({
+                    let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) {
+                        Some(repo_for_path) => repo_for_path,
+                        None => anyhow::bail!(NoRepositoryError {}),
+                    };
+
+                    let relative_path = repo_entry
+                        .relativize(&worktree, &file.path)
+                        .context("failed to relativize buffer path")?;
+
+                    let repo = local_repo_entry.repo().clone();
+
+                    let content = match version {
+                        Some(version) => buffer.rope_for_version(&version).clone(),
+                        None => buffer.as_rope().clone(),
+                    };
+
+                    anyhow::Ok((repo, relative_path, content))
+                });
+
+                cx.background_executor().spawn(async move {
+                    let (repo, relative_path, content) = blame_params?;
+                    repo.blame(&relative_path, content)
+                        .with_context(|| format!("Failed to blame {:?}", relative_path.0))
+                })
+            }
+            Worktree::Remote(worktree) => {
+                let buffer_id = buffer.remote_id();
+                let version = buffer.version();
+                let project_id = worktree.project_id();
+                let client = worktree.client();
+                cx.spawn(|_| async move {
+                    let response = client
+                        .request(proto::BlameBuffer {
+                            project_id,
+                            buffer_id: buffer_id.into(),
+                            version: serialize_version(&version),
+                        })
+                        .await?;
+                    Ok(deserialize_blame_buffer_response(response))
+                })
+            }
+        }
+    }
+
     fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
         let remote_id = buffer.read(cx).remote_id();
         let is_remote = buffer.read(cx).replica_id() != 0;
-        let open_buffer = if self.retain_buffers {
+        let open_buffer = if self.remote_id.is_some() {
             OpenBuffer::Strong(buffer.clone())
         } else {
             OpenBuffer::Weak(buffer.downgrade())
@@ -664,7 +740,7 @@ impl BufferStore {
     }
 
     pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
-        self.set_retain_buffers(false, cx);
+        self.set_remote_id(None, cx);
 
         for buffer in self.buffers() {
             buffer.update(cx, |buffer, cx| {
@@ -677,10 +753,10 @@ impl BufferStore {
         self.remote_buffer_listeners.clear();
     }
 
-    pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) {
-        self.retain_buffers = retain_buffers;
+    pub fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
+        self.remote_id = remote_id;
         for open_buffer in self.opened_buffers.values_mut() {
-            if retain_buffers {
+            if remote_id.is_some() {
                 if let OpenBuffer::Weak(buffer) = open_buffer {
                     if let Some(buffer) = buffer.upgrade() {
                         *open_buffer = OpenBuffer::Strong(buffer);
@@ -741,8 +817,9 @@ impl BufferStore {
             return None;
         };
 
-        let (old_file, new_file) = buffer.update(cx, |buffer, cx| {
-            let old_file = File::from_dyn(buffer.file())?;
+        let events = buffer.update(cx, |buffer, cx| {
+            let file = buffer.file()?;
+            let old_file = File::from_dyn(Some(file))?;
             if old_file.worktree != *worktree {
                 return None;
             }
@@ -786,41 +863,54 @@ impl BufferStore {
                 return None;
             }
 
-            let old_file = Arc::new(old_file.clone());
-            let new_file = Arc::new(new_file);
-            buffer.file_updated(new_file.clone(), cx);
-            Some((old_file, new_file))
-        })?;
-
-        if new_file.path != old_file.path {
-            self.local_buffer_ids_by_path.remove(&ProjectPath {
-                path: old_file.path.clone(),
-                worktree_id: old_file.worktree_id(cx),
-            });
-            self.local_buffer_ids_by_path.insert(
-                ProjectPath {
-                    worktree_id: new_file.worktree_id(cx),
-                    path: new_file.path.clone(),
-                },
-                buffer_id,
-            );
-            cx.emit(BufferStoreEvent::BufferChangedFilePath {
-                buffer: buffer.clone(),
-                old_file: Some(old_file.clone()),
-            });
-        }
+            let mut events = Vec::new();
+            if new_file.path != old_file.path {
+                self.local_buffer_ids_by_path.remove(&ProjectPath {
+                    path: old_file.path.clone(),
+                    worktree_id: old_file.worktree_id(cx),
+                });
+                self.local_buffer_ids_by_path.insert(
+                    ProjectPath {
+                        worktree_id: new_file.worktree_id(cx),
+                        path: new_file.path.clone(),
+                    },
+                    buffer_id,
+                );
+                events.push(BufferStoreEvent::BufferChangedFilePath {
+                    buffer: cx.handle(),
+                    old_file: buffer.file().cloned(),
+                });
+            }
 
-        if new_file.entry_id != old_file.entry_id {
-            if let Some(entry_id) = old_file.entry_id {
-                self.local_buffer_ids_by_entry_id.remove(&entry_id);
+            if new_file.entry_id != old_file.entry_id {
+                if let Some(entry_id) = old_file.entry_id {
+                    self.local_buffer_ids_by_entry_id.remove(&entry_id);
+                }
+                if let Some(entry_id) = new_file.entry_id {
+                    self.local_buffer_ids_by_entry_id
+                        .insert(entry_id, buffer_id);
+                }
             }
-            if let Some(entry_id) = new_file.entry_id {
-                self.local_buffer_ids_by_entry_id
-                    .insert(entry_id, buffer_id);
+
+            if let Some(project_id) = self.remote_id {
+                events.push(BufferStoreEvent::MessageToReplicas(Box::new(
+                    proto::UpdateBufferFile {
+                        project_id,
+                        buffer_id: buffer_id.to_proto(),
+                        file: Some(new_file.to_proto(cx)),
+                    }
+                    .into_envelope(0, None, None),
+                )))
             }
+
+            buffer.file_updated(Arc::new(new_file), cx);
+            Some(events)
+        })?;
+
+        for event in events {
+            cx.emit(event);
         }
 
-        cx.emit(BufferStoreEvent::LocalBufferUpdated { buffer });
         None
     }
 
@@ -899,11 +989,10 @@ impl BufferStore {
         Ok(())
     }
 
-    pub fn handle_update_buffer(
-        &mut self,
+    pub async fn handle_update_buffer(
+        this: Model<Self>,
         envelope: TypedEnvelope<proto::UpdateBuffer>,
-        is_remote: bool,
-        cx: &mut AppContext,
+        mut cx: AsyncAppContext,
     ) -> Result<proto::Ack> {
         let payload = envelope.payload.clone();
         let buffer_id = BufferId::new(payload.buffer_id)?;
@@ -912,32 +1001,26 @@ impl BufferStore {
             .into_iter()
             .map(language::proto::deserialize_operation)
             .collect::<Result<Vec<_>, _>>()?;
-        match self.opened_buffers.entry(buffer_id) {
-            hash_map::Entry::Occupied(mut e) => match e.get_mut() {
-                OpenBuffer::Strong(buffer) => {
-                    buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
-                }
-                OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
-                OpenBuffer::Weak(_) => {}
-            },
-            hash_map::Entry::Vacant(e) => {
-                if !is_remote {
-                    debug_panic!(
-                        "received buffer update from {:?}",
-                        envelope.original_sender_id
-                    );
-                    return Err(anyhow!("received buffer update for non-remote project"));
+        this.update(&mut cx, |this, cx| {
+            match this.opened_buffers.entry(buffer_id) {
+                hash_map::Entry::Occupied(mut e) => match e.get_mut() {
+                    OpenBuffer::Strong(buffer) => {
+                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+                    }
+                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
+                    OpenBuffer::Weak(_) => {}
+                },
+                hash_map::Entry::Vacant(e) => {
+                    e.insert(OpenBuffer::Operations(ops));
                 }
-                e.insert(OpenBuffer::Operations(ops));
             }
-        }
-        Ok(proto::Ack {})
+            Ok(proto::Ack {})
+        })?
     }
 
     pub fn handle_create_buffer_for_peer(
         &mut self,
         envelope: TypedEnvelope<proto::CreateBufferForPeer>,
-        mut worktrees: impl Iterator<Item = Model<Worktree>>,
         replica_id: u16,
         capability: Capability,
         cx: &mut ModelContext<Self>,
@@ -954,8 +1037,10 @@ impl BufferStore {
                     let mut buffer_file = None;
                     if let Some(file) = state.file.take() {
                         let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id);
-                        let worktree = worktrees
-                            .find(|worktree| worktree.read(cx).id() == worktree_id)
+                        let worktree = self
+                            .worktree_store
+                            .read(cx)
+                            .worktree_for_id(worktree_id, cx)
                             .ok_or_else(|| {
                                 anyhow!("no worktree found for id {}", file.worktree_id)
                             })?;
@@ -1018,14 +1103,74 @@ impl BufferStore {
         Ok(())
     }
 
+    pub async fn handle_update_buffer_file(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::UpdateBufferFile>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let buffer_id = envelope.payload.buffer_id;
+        let buffer_id = BufferId::new(buffer_id)?;
+
+        this.update(&mut cx, |this, cx| {
+            let payload = envelope.payload.clone();
+            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
+                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
+                let worktree = this
+                    .worktree_store
+                    .read(cx)
+                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
+                    .ok_or_else(|| anyhow!("no such worktree"))?;
+                let file = File::from_proto(file, worktree, cx)?;
+                let old_file = buffer.update(cx, |buffer, cx| {
+                    let old_file = buffer.file().cloned();
+                    let new_path = file.path.clone();
+                    buffer.file_updated(Arc::new(file), cx);
+                    if old_file
+                        .as_ref()
+                        .map_or(true, |old| *old.path() != new_path)
+                    {
+                        Some(old_file)
+                    } else {
+                        None
+                    }
+                });
+                if let Some(old_file) = old_file {
+                    cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file });
+                }
+            }
+            Ok(())
+        })?
+    }
+
+    pub async fn handle_update_diff_base(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::UpdateDiffBase>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, cx| {
+            let buffer_id = envelope.payload.buffer_id;
+            let buffer_id = BufferId::new(buffer_id)?;
+            if let Some(buffer) = this.get_possibly_incomplete(buffer_id) {
+                buffer.update(cx, |buffer, cx| {
+                    buffer.set_diff_base(envelope.payload.diff_base, cx)
+                });
+            }
+            Ok(())
+        })?
+    }
+
     pub async fn handle_save_buffer(
         this: Model<Self>,
-        project_id: u64,
         envelope: TypedEnvelope<proto::SaveBuffer>,
         mut cx: AsyncAppContext,
     ) -> Result<proto::BufferSaved> {
         let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
-        let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??;
+        let (buffer, project_id) = this.update(&mut cx, |this, _| {
+            anyhow::Ok((
+                this.get_existing(buffer_id)?,
+                this.remote_id.context("project is not shared")?,
+            ))
+        })??;
         buffer
             .update(&mut cx, |buffer, _| {
                 buffer.wait_for_version(deserialize_version(&envelope.payload.version))
@@ -1090,6 +1235,27 @@ impl BufferStore {
         })
     }
 
+    pub async fn handle_blame_buffer(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::BlameBuffer>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::BlameBufferResponse> {
+        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
+        let version = deserialize_version(&envelope.payload.version);
+        let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??;
+        buffer
+            .update(&mut cx, |buffer, _| {
+                buffer.wait_for_version(version.clone())
+            })?
+            .await?;
+        let blame = this
+            .update(&mut cx, |this, cx| {
+                this.blame_buffer(&buffer, Some(version), cx)
+            })?
+            .await?;
+        Ok(serialize_blame_buffer_response(blame))
+    }
+
     pub async fn wait_for_loading_buffer(
         mut receiver: postage::watch::Receiver<Option<Result<Model<Buffer>, Arc<anyhow::Error>>>>,
     ) -> Result<Model<Buffer>, Arc<anyhow::Error>> {
@@ -1121,3 +1287,101 @@ fn is_not_found_error(error: &anyhow::Error) -> bool {
         .downcast_ref::<io::Error>()
         .is_some_and(|err| err.kind() == io::ErrorKind::NotFound)
 }
+
+fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
+    let entries = blame
+        .entries
+        .into_iter()
+        .map(|entry| proto::BlameEntry {
+            sha: entry.sha.as_bytes().into(),
+            start_line: entry.range.start,
+            end_line: entry.range.end,
+            original_line_number: entry.original_line_number,
+            author: entry.author.clone(),
+            author_mail: entry.author_mail.clone(),
+            author_time: entry.author_time,
+            author_tz: entry.author_tz.clone(),
+            committer: entry.committer.clone(),
+            committer_mail: entry.committer_mail.clone(),
+            committer_time: entry.committer_time,
+            committer_tz: entry.committer_tz.clone(),
+            summary: entry.summary.clone(),
+            previous: entry.previous.clone(),
+            filename: entry.filename.clone(),
+        })
+        .collect::<Vec<_>>();
+
+    let messages = blame
+        .messages
+        .into_iter()
+        .map(|(oid, message)| proto::CommitMessage {
+            oid: oid.as_bytes().into(),
+            message,
+        })
+        .collect::<Vec<_>>();
+
+    let permalinks = blame
+        .permalinks
+        .into_iter()
+        .map(|(oid, url)| proto::CommitPermalink {
+            oid: oid.as_bytes().into(),
+            permalink: url.to_string(),
+        })
+        .collect::<Vec<_>>();
+
+    proto::BlameBufferResponse {
+        entries,
+        messages,
+        permalinks,
+        remote_url: blame.remote_url,
+    }
+}
+
+fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
+    let entries = response
+        .entries
+        .into_iter()
+        .filter_map(|entry| {
+            Some(git::blame::BlameEntry {
+                sha: git::Oid::from_bytes(&entry.sha).ok()?,
+                range: entry.start_line..entry.end_line,
+                original_line_number: entry.original_line_number,
+                committer: entry.committer,
+                committer_time: entry.committer_time,
+                committer_tz: entry.committer_tz,
+                committer_mail: entry.committer_mail,
+                author: entry.author,
+                author_mail: entry.author_mail,
+                author_time: entry.author_time,
+                author_tz: entry.author_tz,
+                summary: entry.summary,
+                previous: entry.previous,
+                filename: entry.filename,
+            })
+        })
+        .collect::<Vec<_>>();
+
+    let messages = response
+        .messages
+        .into_iter()
+        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
+        .collect::<HashMap<_, _>>();
+
+    let permalinks = response
+        .permalinks
+        .into_iter()
+        .filter_map(|permalink| {
+            Some((
+                git::Oid::from_bytes(&permalink.oid).ok()?,
+                Url::from_str(&permalink.permalink).ok()?,
+            ))
+        })
+        .collect::<HashMap<_, _>>();
+
+    Blame {
+        entries,
+        permalinks,
+        messages,
+        remote_url: response.remote_url,
+    }
+}

crates/project/src/project.rs 🔗

@@ -39,7 +39,7 @@ use gpui::{
     AnyModel, AppContext, AsyncAppContext, BackgroundExecutor, BorrowAppContext, Context, Entity,
     EventEmitter, Model, ModelContext, PromptLevel, SharedString, Task, WeakModel, WindowContext,
 };
-use http_client::{HttpClient, Url};
+use http_client::HttpClient;
 use itertools::Itertools;
 use language::{
     language_settings::{
@@ -101,7 +101,7 @@ use std::{
     ops::Range,
     path::{self, Component, Path, PathBuf},
     process::Stdio,
-    str::{self, FromStr},
+    str,
     sync::{
         atomic::{AtomicUsize, Ordering::SeqCst},
         Arc,
@@ -690,23 +690,15 @@ impl Project {
         client.add_model_message_handler(Self::handle_add_collaborator);
         client.add_model_message_handler(Self::handle_update_project_collaborator);
         client.add_model_message_handler(Self::handle_remove_collaborator);
-        client.add_model_message_handler(Self::handle_buffer_reloaded);
-        client.add_model_message_handler(Self::handle_buffer_saved);
         client.add_model_message_handler(Self::handle_start_language_server);
         client.add_model_message_handler(Self::handle_update_language_server);
         client.add_model_message_handler(Self::handle_update_project);
         client.add_model_message_handler(Self::handle_unshare_project);
         client.add_model_message_handler(Self::handle_create_buffer_for_peer);
-        client.add_model_message_handler(Self::handle_update_buffer_file);
         client.add_model_request_handler(Self::handle_update_buffer);
         client.add_model_message_handler(Self::handle_update_diagnostic_summary);
         client.add_model_message_handler(Self::handle_update_worktree);
         client.add_model_message_handler(Self::handle_update_worktree_settings);
-        client.add_model_request_handler(Self::handle_create_project_entry);
-        client.add_model_request_handler(Self::handle_rename_project_entry);
-        client.add_model_request_handler(Self::handle_copy_project_entry);
-        client.add_model_request_handler(Self::handle_delete_project_entry);
-        client.add_model_request_handler(Self::handle_expand_project_entry);
         client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion);
         client.add_model_request_handler(Self::handle_resolve_completion_documentation);
         client.add_model_request_handler(Self::handle_apply_code_action);
@@ -732,15 +724,25 @@ impl Project {
         client.add_model_request_handler(Self::handle_open_buffer_by_id);
         client.add_model_request_handler(Self::handle_open_buffer_by_path);
         client.add_model_request_handler(Self::handle_open_new_buffer);
-        client.add_model_request_handler(Self::handle_save_buffer);
-        client.add_model_message_handler(Self::handle_update_diff_base);
         client.add_model_request_handler(Self::handle_lsp_command::<lsp_ext_command::ExpandMacro>);
-        client.add_model_request_handler(Self::handle_blame_buffer);
         client.add_model_request_handler(Self::handle_multi_lsp_query);
         client.add_model_request_handler(Self::handle_restart_language_servers);
         client.add_model_request_handler(Self::handle_task_context_for_location);
         client.add_model_request_handler(Self::handle_task_templates);
         client.add_model_request_handler(Self::handle_lsp_command::<LinkedEditingRange>);
+
+        client.add_model_request_handler(WorktreeStore::handle_create_project_entry);
+        client.add_model_request_handler(WorktreeStore::handle_rename_project_entry);
+        client.add_model_request_handler(WorktreeStore::handle_copy_project_entry);
+        client.add_model_request_handler(WorktreeStore::handle_delete_project_entry);
+        client.add_model_request_handler(WorktreeStore::handle_expand_project_entry);
+
+        client.add_model_message_handler(BufferStore::handle_buffer_reloaded);
+        client.add_model_message_handler(BufferStore::handle_buffer_saved);
+        client.add_model_message_handler(BufferStore::handle_update_buffer_file);
+        client.add_model_message_handler(BufferStore::handle_update_diff_base);
+        client.add_model_request_handler(BufferStore::handle_save_buffer);
+        client.add_model_request_handler(BufferStore::handle_blame_buffer);
     }
 
     pub fn local(
@@ -765,7 +767,7 @@ impl Project {
                 .detach();
 
             let buffer_store =
-                cx.new_model(|cx| BufferStore::new(worktree_store.clone(), false, cx));
+                cx.new_model(|cx| BufferStore::new(worktree_store.clone(), None, cx));
             cx.subscribe(&buffer_store, Self::on_buffer_store_event)
                 .detach();
 
@@ -830,7 +832,7 @@ impl Project {
     }
 
     pub fn ssh(
-        ssh_session: Arc<SshSession>,
+        ssh: Arc<SshSession>,
         client: Arc<Client>,
         node: Arc<dyn NodeRuntime>,
         user_store: Model<UserStore>,
@@ -840,11 +842,14 @@ impl Project {
     ) -> Model<Self> {
         let this = Self::local(client, node, user_store, languages, fs, cx);
         this.update(cx, |this, cx| {
-            ssh_session.add_message_handler(cx.weak_model(), Self::handle_update_worktree);
-            ssh_session.add_message_handler(cx.weak_model(), Self::handle_create_buffer_for_peer);
-            ssh_session.add_message_handler(cx.weak_model(), Self::handle_update_buffer_file);
-            ssh_session.add_message_handler(cx.weak_model(), Self::handle_update_diff_base);
-            this.ssh_session = Some(ssh_session);
+            let buffer_store = this.buffer_store.downgrade();
+
+            ssh.add_message_handler(cx.weak_model(), Self::handle_update_worktree);
+            ssh.add_message_handler(cx.weak_model(), Self::handle_create_buffer_for_peer);
+            ssh.add_message_handler(buffer_store.clone(), BufferStore::handle_update_buffer_file);
+            ssh.add_message_handler(buffer_store.clone(), BufferStore::handle_update_diff_base);
+
+            this.ssh_session = Some(ssh);
         });
         this
     }
@@ -877,7 +882,10 @@ impl Project {
     ) -> Result<Model<Self>> {
         client.authenticate_and_connect(true, &cx).await?;
 
-        let subscription = client.subscribe_to_entity(remote_id)?;
+        let subscriptions = (
+            client.subscribe_to_entity::<Self>(remote_id)?,
+            client.subscribe_to_entity::<BufferStore>(remote_id)?,
+        );
         let response = client
             .request_envelope(proto::JoinProject {
                 project_id: remote_id,
@@ -885,7 +893,7 @@ impl Project {
             .await?;
         Self::from_join_project_response(
             response,
-            subscription,
+            subscriptions,
             client,
             user_store,
             languages,
@@ -897,7 +905,10 @@ impl Project {
 
     async fn from_join_project_response(
         response: TypedEnvelope<proto::JoinProjectResponse>,
-        subscription: PendingEntitySubscription<Project>,
+        subscription: (
+            PendingEntitySubscription<Project>,
+            PendingEntitySubscription<BufferStore>,
+        ),
         client: Arc<Client>,
         user_store: Model<UserStore>,
         languages: Arc<LanguageRegistry>,
@@ -906,6 +917,11 @@ impl Project {
     ) -> Result<Model<Self>> {
         let remote_id = response.payload.project_id;
         let role = response.payload.role();
+
+        let worktree_store = cx.new_model(|_| WorktreeStore::new(true))?;
+        let buffer_store =
+            cx.new_model(|cx| BufferStore::new(worktree_store.clone(), Some(remote_id), cx))?;
+
         let this = cx.new_model(|cx| {
             let replica_id = response.payload.replica_id as ReplicaId;
             let tasks = Inventory::new(cx);
@@ -913,9 +929,7 @@ impl Project {
             let snippets =
                 SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx);
             let yarn = YarnPathStore::new(fs.clone(), cx);
-            // BIG CAUTION NOTE: The order in which we initialize fields here matters and it should match what's done in Self::local.
-            // Otherwise, you might run into issues where worktree id on remote is different than what's on local host.
-            // That's because Worktree's identifier is entity id, which should probably be changed.
+
             let mut worktrees = Vec::new();
             for worktree in response.payload.worktrees {
                 let worktree =
@@ -927,16 +941,12 @@ impl Project {
             cx.spawn(move |this, cx| Self::send_buffer_ordered_messages(this, rx, cx))
                 .detach();
 
-            let worktree_store = cx.new_model(|_| WorktreeStore::new(true));
-
-            let buffer_store =
-                cx.new_model(|cx| BufferStore::new(worktree_store.clone(), true, cx));
             cx.subscribe(&buffer_store, Self::on_buffer_store_event)
                 .detach();
 
             let mut this = Self {
                 buffer_ordered_messages_tx: tx,
-                buffer_store,
+                buffer_store: buffer_store.clone(),
                 worktree_store,
                 shared_buffers: Default::default(),
                 loading_worktrees: Default::default(),
@@ -1018,7 +1028,11 @@ impl Project {
             }
             this
         })?;
-        let subscription = subscription.set_model(&this, &mut cx);
+
+        let subscriptions = [
+            subscription.0.set_model(&this, &mut cx),
+            subscription.1.set_model(&buffer_store, &mut cx),
+        ];
 
         let user_ids = response
             .payload
@@ -1032,7 +1046,7 @@ impl Project {
 
         this.update(&mut cx, |this, cx| {
             this.set_collaborators_from_proto(response.payload.collaborators, cx)?;
-            this.client_subscriptions.push(subscription);
+            this.client_subscriptions.extend(subscriptions);
             anyhow::Ok(())
         })??;
 
@@ -1049,7 +1063,10 @@ impl Project {
     ) -> Result<Model<Self>> {
         client.authenticate_and_connect(true, &cx).await?;
 
-        let subscription = client.subscribe_to_entity(remote_id.0)?;
+        let subscriptions = (
+            client.subscribe_to_entity::<Self>(remote_id.0)?,
+            client.subscribe_to_entity::<BufferStore>(remote_id.0)?,
+        );
         let response = client
             .request_envelope(proto::JoinHostedProject {
                 project_id: remote_id.0,
@@ -1057,7 +1074,7 @@ impl Project {
             .await?;
         Self::from_join_project_response(
             response,
-            subscription,
+            subscriptions,
             client,
             user_store,
             languages,
@@ -1572,14 +1589,20 @@ impl Project {
                 return Err(anyhow!("project was already shared"));
             }
         }
-        self.client_subscriptions.push(
+        self.client_subscriptions.extend([
             self.client
                 .subscribe_to_entity(project_id)?
                 .set_model(&cx.handle(), &mut cx.to_async()),
-        );
+            self.client
+                .subscribe_to_entity(project_id)?
+                .set_model(&self.worktree_store, &mut cx.to_async()),
+            self.client
+                .subscribe_to_entity(project_id)?
+                .set_model(&self.buffer_store, &mut cx.to_async()),
+        ]);
 
         self.buffer_store.update(cx, |buffer_store, cx| {
-            buffer_store.set_retain_buffers(true, cx)
+            buffer_store.set_remote_id(Some(project_id), cx)
         });
         self.worktree_store.update(cx, |store, cx| {
             store.set_shared(true, cx);
@@ -1789,9 +1812,8 @@ impl Project {
             self.worktree_store.update(cx, |store, cx| {
                 store.set_shared(false, cx);
             });
-            self.buffer_store.update(cx, |buffer_store, cx| {
-                buffer_store.set_retain_buffers(false, cx)
-            });
+            self.buffer_store
+                .update(cx, |buffer_store, cx| buffer_store.set_remote_id(None, cx));
             self.client
                 .send(proto::UnshareProject {
                     project_id: remote_id,
@@ -2377,72 +2399,15 @@ impl Project {
                 self.register_buffer(buffer, cx).log_err();
             }
             BufferStoreEvent::BufferChangedFilePath { buffer, old_file } => {
-                if let Some(old_file) = &old_file {
+                if let Some(old_file) = File::from_dyn(old_file.as_ref()) {
                     self.unregister_buffer_from_language_servers(&buffer, old_file, cx);
                 }
 
                 self.detect_language_for_buffer(&buffer, cx);
                 self.register_buffer_with_language_servers(&buffer, cx);
             }
-            BufferStoreEvent::LocalBufferUpdated { buffer } => {
-                let buffer = buffer.read(cx);
-                let buffer_id = buffer.remote_id();
-                let Some(new_file) = buffer.file() else {
-                    return;
-                };
-                if let Some(project_id) = self.remote_id() {
-                    self.client
-                        .send(proto::UpdateBufferFile {
-                            project_id,
-                            buffer_id: buffer_id.into(),
-                            file: Some(new_file.to_proto(cx)),
-                        })
-                        .log_err();
-                }
-            }
-            BufferStoreEvent::DiffBaseUpdated { buffer } => {
-                let buffer = buffer.read(cx);
-                let buffer_id = buffer.remote_id();
-                let diff_base = buffer.diff_base();
-                if let Some(project_id) = self.remote_id() {
-                    self.client
-                        .send(proto::UpdateDiffBase {
-                            project_id,
-                            buffer_id: buffer_id.to_proto(),
-                            diff_base: diff_base.map(|b| b.to_string()),
-                        })
-                        .log_err();
-                }
-            }
-            BufferStoreEvent::BufferSaved {
-                buffer: buffer_handle,
-                has_changed_file,
-                saved_version,
-            } => {
-                let buffer = buffer_handle.read(cx);
-                let buffer_id = buffer.remote_id();
-                let Some(new_file) = buffer.file() else {
-                    return;
-                };
-                if let Some(project_id) = self.remote_id() {
-                    self.client
-                        .send(proto::BufferSaved {
-                            project_id,
-                            buffer_id: buffer_id.into(),
-                            version: serialize_version(&saved_version),
-                            mtime: new_file.mtime().map(|time| time.into()),
-                        })
-                        .log_err();
-                    if *has_changed_file {
-                        self.client
-                            .send(proto::UpdateBufferFile {
-                                project_id,
-                                buffer_id: buffer_id.into(),
-                                file: Some(new_file.to_proto(cx)),
-                            })
-                            .log_err();
-                    }
-                }
+            BufferStoreEvent::MessageToReplicas(message) => {
+                self.client.send_dynamic(message.as_ref().clone()).log_err();
             }
         }
     }
@@ -8428,96 +8393,11 @@ impl Project {
         version: Option<clock::Global>,
         cx: &AppContext,
     ) -> Task<Result<Blame>> {
-        if self.is_local() {
-            let blame_params = maybe!({
-                let buffer = buffer.read(cx);
-                let buffer_project_path = buffer
-                    .project_path(cx)
-                    .context("failed to get buffer project path")?;
-
-                let worktree = self
-                    .worktree_for_id(buffer_project_path.worktree_id, cx)
-                    .context("failed to get worktree")?
-                    .read(cx)
-                    .as_local()
-                    .context("worktree was not local")?
-                    .snapshot();
-
-                let (repo_entry, local_repo_entry) =
-                    match worktree.repo_for_path(&buffer_project_path.path) {
-                        Some(repo_for_path) => repo_for_path,
-                        None => anyhow::bail!(NoRepositoryError {}),
-                    };
-
-                let relative_path = repo_entry
-                    .relativize(&worktree, &buffer_project_path.path)
-                    .context("failed to relativize buffer path")?;
-
-                let repo = local_repo_entry.repo().clone();
-
-                let content = match version {
-                    Some(version) => buffer.rope_for_version(&version).clone(),
-                    None => buffer.as_rope().clone(),
-                };
-
-                anyhow::Ok((repo, relative_path, content))
-            });
-
-            cx.background_executor().spawn(async move {
-                let (repo, relative_path, content) = blame_params?;
-                repo.blame(&relative_path, content)
-                    .with_context(|| format!("Failed to blame {:?}", relative_path.0))
-            })
-        } else {
-            let project_id = self.remote_id();
-            let buffer_id = buffer.read(cx).remote_id();
-            let client = self.client.clone();
-            let version = buffer.read(cx).version();
-
-            cx.spawn(|_| async move {
-                let project_id = project_id.context("unable to get project id for buffer")?;
-                let response = client
-                    .request(proto::BlameBuffer {
-                        project_id,
-                        buffer_id: buffer_id.into(),
-                        version: serialize_version(&version),
-                    })
-                    .await?;
-
-                Ok(deserialize_blame_buffer_response(response))
-            })
-        }
+        self.buffer_store.read(cx).blame_buffer(buffer, version, cx)
     }
 
     // RPC message handlers
 
-    async fn handle_blame_buffer(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::BlameBuffer>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::BlameBufferResponse> {
-        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
-        let version = deserialize_version(&envelope.payload.version);
-
-        let buffer = this.update(&mut cx, |this, cx| {
-            this.buffer_store.read(cx).get_existing(buffer_id)
-        })??;
-
-        buffer
-            .update(&mut cx, |buffer, _| {
-                buffer.wait_for_version(version.clone())
-            })?
-            .await?;
-
-        let blame = this
-            .update(&mut cx, |this, cx| {
-                this.blame_buffer(&buffer, Some(version), cx)
-            })?
-            .await?;
-
-        Ok(serialize_blame_buffer_response(blame))
-    }
-
     async fn handle_multi_lsp_query(
         project: Model<Self>,
         envelope: TypedEnvelope<proto::MultiLspQuery>,
@@ -8827,51 +8707,6 @@ impl Project {
         })?
     }
 
-    async fn handle_create_project_entry(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::CreateProjectEntry>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::ProjectEntryResponse> {
-        let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?;
-        WorktreeStore::handle_create_project_entry(worktree_store, envelope, cx).await
-    }
-
-    async fn handle_rename_project_entry(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::RenameProjectEntry>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::ProjectEntryResponse> {
-        let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?;
-        WorktreeStore::handle_rename_project_entry(worktree_store, envelope, cx).await
-    }
-
-    async fn handle_copy_project_entry(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::CopyProjectEntry>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::ProjectEntryResponse> {
-        let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?;
-        WorktreeStore::handle_copy_project_entry(worktree_store, envelope, cx).await
-    }
-
-    async fn handle_delete_project_entry(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::DeleteProjectEntry>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::ProjectEntryResponse> {
-        let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?;
-        WorktreeStore::handle_delete_project_entry(worktree_store, envelope, cx).await
-    }
-
-    async fn handle_expand_project_entry(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::ExpandProjectEntry>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::ExpandProjectEntryResponse> {
-        let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?;
-        WorktreeStore::handle_expand_project_entry(worktree_store, envelope, cx).await
-    }
-
     async fn handle_update_diagnostic_summary(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::UpdateDiagnosticSummary>,
@@ -9008,9 +8843,9 @@ impl Project {
     async fn handle_update_buffer(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::UpdateBuffer>,
-        mut cx: AsyncAppContext,
+        cx: AsyncAppContext,
     ) -> Result<proto::Ack> {
-        this.update(&mut cx, |this, cx| {
+        let buffer_store = this.read_with(&cx, |this, cx| {
             if let Some(ssh) = &this.ssh_session {
                 let mut payload = envelope.payload.clone();
                 payload.project_id = 0;
@@ -9018,10 +8853,9 @@ impl Project {
                     .spawn(ssh.request(payload))
                     .detach_and_log_err(cx);
             }
-            this.buffer_store.update(cx, |buffer_store, cx| {
-                buffer_store.handle_update_buffer(envelope, this.is_remote(), cx)
-            })
-        })?
+            this.buffer_store.clone()
+        })?;
+        BufferStore::handle_update_buffer(buffer_store, envelope, cx).await
     }
 
     async fn handle_create_buffer_for_peer(
@@ -9033,7 +8867,6 @@ impl Project {
             this.buffer_store.update(cx, |buffer_store, cx| {
                 buffer_store.handle_create_buffer_for_peer(
                     envelope,
-                    this.worktrees(cx).collect::<Vec<_>>().into_iter(),
                     this.replica_id(),
                     this.capability(),
                     cx,
@@ -9042,69 +8875,6 @@ impl Project {
         })?
     }
 
-    async fn handle_update_diff_base(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::UpdateDiffBase>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        this.update(&mut cx, |this, cx| {
-            let buffer_id = envelope.payload.buffer_id;
-            let buffer_id = BufferId::new(buffer_id)?;
-            if let Some(buffer) = this
-                .buffer_store
-                .read(cx)
-                .get_possibly_incomplete(buffer_id)
-            {
-                buffer.update(cx, |buffer, cx| {
-                    buffer.set_diff_base(envelope.payload.diff_base, cx)
-                });
-            }
-            Ok(())
-        })?
-    }
-
-    async fn handle_update_buffer_file(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::UpdateBufferFile>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        let buffer_id = envelope.payload.buffer_id;
-        let buffer_id = BufferId::new(buffer_id)?;
-
-        this.update(&mut cx, |this, cx| {
-            let payload = envelope.payload.clone();
-            if let Some(buffer) = this
-                .buffer_store
-                .read(cx)
-                .get_possibly_incomplete(buffer_id)
-            {
-                let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
-                let worktree = this
-                    .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
-                    .ok_or_else(|| anyhow!("no such worktree"))?;
-                let file = File::from_proto(file, worktree, cx)?;
-                buffer.update(cx, |buffer, cx| {
-                    buffer.file_updated(Arc::new(file), cx);
-                });
-                this.detect_language_for_buffer(&buffer, cx);
-            }
-            Ok(())
-        })?
-    }
-
-    async fn handle_save_buffer(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::SaveBuffer>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::BufferSaved> {
-        let (buffer_store, project_id) = this.update(&mut cx, |this, _| {
-            let buffer_store = this.buffer_store.clone();
-            let project_id = this.remote_id().context("not connected")?;
-            anyhow::Ok((buffer_store, project_id))
-        })??;
-        BufferStore::handle_save_buffer(buffer_store, project_id, envelope, cx).await
-    }
-
     async fn handle_reload_buffers(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::ReloadBuffers>,
@@ -10252,24 +10022,6 @@ impl Project {
         })
     }
 
-    async fn handle_buffer_saved(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::BufferSaved>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        let buffer_store = this.update(&mut cx, |this, _| this.buffer_store.clone())?;
-        BufferStore::handle_buffer_saved(buffer_store, envelope, cx).await
-    }
-
-    async fn handle_buffer_reloaded(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::BufferReloaded>,
-        mut cx: AsyncAppContext,
-    ) -> Result<()> {
-        let buffer_store = this.update(&mut cx, |this, _| this.buffer_store.clone())?;
-        BufferStore::handle_buffer_reloaded(buffer_store, envelope, cx).await
-    }
-
     #[allow(clippy::type_complexity)]
     fn edits_from_lsp(
         &mut self,
@@ -11488,104 +11240,6 @@ async fn load_shell_environment(
     Ok(parsed_env)
 }
 
-fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse {
-    let entries = blame
-        .entries
-        .into_iter()
-        .map(|entry| proto::BlameEntry {
-            sha: entry.sha.as_bytes().into(),
-            start_line: entry.range.start,
-            end_line: entry.range.end,
-            original_line_number: entry.original_line_number,
-            author: entry.author.clone(),
-            author_mail: entry.author_mail.clone(),
-            author_time: entry.author_time,
-            author_tz: entry.author_tz.clone(),
-            committer: entry.committer.clone(),
-            committer_mail: entry.committer_mail.clone(),
-            committer_time: entry.committer_time,
-            committer_tz: entry.committer_tz.clone(),
-            summary: entry.summary.clone(),
-            previous: entry.previous.clone(),
-            filename: entry.filename.clone(),
-        })
-        .collect::<Vec<_>>();
-
-    let messages = blame
-        .messages
-        .into_iter()
-        .map(|(oid, message)| proto::CommitMessage {
-            oid: oid.as_bytes().into(),
-            message,
-        })
-        .collect::<Vec<_>>();
-
-    let permalinks = blame
-        .permalinks
-        .into_iter()
-        .map(|(oid, url)| proto::CommitPermalink {
-            oid: oid.as_bytes().into(),
-            permalink: url.to_string(),
-        })
-        .collect::<Vec<_>>();
-
-    proto::BlameBufferResponse {
-        entries,
-        messages,
-        permalinks,
-        remote_url: blame.remote_url,
-    }
-}
-
-fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame {
-    let entries = response
-        .entries
-        .into_iter()
-        .filter_map(|entry| {
-            Some(git::blame::BlameEntry {
-                sha: git::Oid::from_bytes(&entry.sha).ok()?,
-                range: entry.start_line..entry.end_line,
-                original_line_number: entry.original_line_number,
-                committer: entry.committer,
-                committer_time: entry.committer_time,
-                committer_tz: entry.committer_tz,
-                committer_mail: entry.committer_mail,
-                author: entry.author,
-                author_mail: entry.author_mail,
-                author_time: entry.author_time,
-                author_tz: entry.author_tz,
-                summary: entry.summary,
-                previous: entry.previous,
-                filename: entry.filename,
-            })
-        })
-        .collect::<Vec<_>>();
-
-    let messages = response
-        .messages
-        .into_iter()
-        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
-        .collect::<HashMap<_, _>>();
-
-    let permalinks = response
-        .permalinks
-        .into_iter()
-        .filter_map(|permalink| {
-            Some((
-                git::Oid::from_bytes(&permalink.oid).ok()?,
-                Url::from_str(&permalink.permalink).ok()?,
-            ))
-        })
-        .collect::<HashMap<_, _>>();
-
-    Blame {
-        entries,
-        permalinks,
-        messages,
-        remote_url: response.remote_url,
-    }
-}
-
 fn remove_empty_hover_blocks(mut hover: Hover) -> Option<Hover> {
     hover
         .contents

crates/project/src/project_tests.rs 🔗

@@ -2,6 +2,7 @@ use crate::{Event, *};
 use fs::FakeFs;
 use futures::{future, StreamExt};
 use gpui::{AppContext, SemanticVersion, UpdateGlobal};
+use http_client::Url;
 use language::{
     language_settings::{AllLanguageSettings, LanguageSettingsContent},
     tree_sitter_rust, tree_sitter_typescript, Diagnostic, FakeLspAdapter, LanguageConfig,

crates/proto/src/proto.rs 🔗

@@ -103,6 +103,10 @@ impl AnyProtoClient {
         let envelope = request.into_envelope(0, None, None);
         self.0.send(envelope)
     }
+
+    pub fn send_dynamic(&self, message: Envelope) -> anyhow::Result<()> {
+        self.0.send(message)
+    }
 }
 
 impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {

crates/remote_server/src/headless_project.rs 🔗

@@ -40,14 +40,18 @@ impl HeadlessProject {
         let this = cx.weak_model();
 
         let worktree_store = cx.new_model(|_| WorktreeStore::new(true));
-        let buffer_store = cx.new_model(|cx| BufferStore::new(worktree_store.clone(), true, cx));
+        let buffer_store =
+            cx.new_model(|cx| BufferStore::new(worktree_store.clone(), Some(PROJECT_ID), cx));
         cx.subscribe(&buffer_store, Self::on_buffer_store_event)
             .detach();
 
         session.add_request_handler(this.clone(), Self::handle_add_worktree);
         session.add_request_handler(this.clone(), Self::handle_open_buffer_by_path);
-        session.add_request_handler(this.clone(), Self::handle_update_buffer);
-        session.add_request_handler(this.clone(), Self::handle_save_buffer);
+
+        session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_blame_buffer);
+        session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_update_buffer);
+        session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_save_buffer);
+
         session.add_request_handler(
             worktree_store.downgrade(),
             WorktreeStore::handle_create_project_entry,
@@ -112,27 +116,6 @@ impl HeadlessProject {
         })
     }
 
-    pub async fn handle_update_buffer(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::UpdateBuffer>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::Ack> {
-        this.update(&mut cx, |this, cx| {
-            this.buffer_store.update(cx, |buffer_store, cx| {
-                buffer_store.handle_update_buffer(envelope, false, cx)
-            })
-        })?
-    }
-
-    pub async fn handle_save_buffer(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::SaveBuffer>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::BufferSaved> {
-        let buffer_store = this.update(&mut cx, |this, _| this.buffer_store.clone())?;
-        BufferStore::handle_save_buffer(buffer_store, PROJECT_ID, envelope, cx).await
-    }
-
     pub async fn handle_open_buffer_by_path(
         this: Model<Self>,
         message: TypedEnvelope<proto::OpenBufferByPath>,
@@ -178,33 +161,12 @@ impl HeadlessProject {
         &mut self,
         _: Model<BufferStore>,
         event: &BufferStoreEvent,
-        cx: &mut ModelContext<Self>,
+        _: &mut ModelContext<Self>,
     ) {
         match event {
-            BufferStoreEvent::LocalBufferUpdated { buffer } => {
-                let buffer = buffer.read(cx);
-                let buffer_id = buffer.remote_id();
-                let Some(new_file) = buffer.file() else {
-                    return;
-                };
-                self.session
-                    .send(proto::UpdateBufferFile {
-                        project_id: 0,
-                        buffer_id: buffer_id.into(),
-                        file: Some(new_file.to_proto(cx)),
-                    })
-                    .log_err();
-            }
-            BufferStoreEvent::DiffBaseUpdated { buffer } => {
-                let buffer = buffer.read(cx);
-                let buffer_id = buffer.remote_id();
-                let diff_base = buffer.diff_base();
+            BufferStoreEvent::MessageToReplicas(message) => {
                 self.session
-                    .send(proto::UpdateDiffBase {
-                        project_id: 0,
-                        buffer_id: buffer_id.to_proto(),
-                        diff_base: diff_base.map(|b| b.to_string()),
-                    })
+                    .send_dynamic(message.as_ref().clone())
                     .log_err();
             }
             _ => {}