Start work on reload buffers (#18245)

Conrad Irwin and Mikayla created

Release Notes:

- Fixed: ssh-remote reload buffers

---------

Co-authored-by: Mikayla <mikayla@zed.dev>

Change summary

crates/project/src/buffer_store.rs               | 309 +++++++++++++----
crates/project/src/project.rs                    | 139 -------
crates/remote_server/src/headless_project.rs     |   3 
crates/remote_server/src/remote_editing_tests.rs |  82 ++++
4 files changed, 325 insertions(+), 208 deletions(-)

Detailed changes

crates/project/src/buffer_store.rs 🔗

@@ -14,7 +14,10 @@ use gpui::{
 };
 use http_client::Url;
 use language::{
-    proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations},
+    proto::{
+        deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version,
+        split_operations,
+    },
     Buffer, BufferEvent, Capability, File as _, Language, Operation,
 };
 use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope};
@@ -29,9 +32,8 @@ use worktree::{
 
 /// A set of open buffers.
 pub struct BufferStore {
-    downstream_client: Option<AnyProtoClient>,
-    remote_id: Option<u64>,
-    #[allow(unused)]
+    state: BufferStoreState,
+    downstream_client: Option<(AnyProtoClient, u64)>,
     worktree_store: Model<WorktreeStore>,
     opened_buffers: HashMap<BufferId, OpenBuffer>,
     local_buffer_ids_by_path: HashMap<ProjectPath, BufferId>,
@@ -44,12 +46,11 @@ pub struct BufferStore {
     loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
     remote_buffer_listeners:
         HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
-    shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
+    shared_buffers: HashMap<proto::PeerId, HashSet<Model<Buffer>>>,
 }
 
 enum OpenBuffer {
-    Strong(Model<Buffer>),
-    Weak(WeakModel<Buffer>),
+    Buffer(WeakModel<Buffer>),
     Operations(Vec<Operation>),
 }
 
@@ -62,6 +63,15 @@ pub enum BufferStoreEvent {
     },
 }
 
+enum BufferStoreState {
+    Remote {
+        shared_with_me: HashSet<Model<Buffer>>,
+        upstream_client: AnyProtoClient,
+        project_id: u64,
+    },
+    Local {},
+}
+
 #[derive(Default, Debug)]
 pub struct ProjectTransaction(pub HashMap<Model<Buffer>, language::Transaction>);
 
@@ -75,17 +85,36 @@ impl BufferStore {
         client.add_model_message_handler(Self::handle_update_diff_base);
         client.add_model_request_handler(Self::handle_save_buffer);
         client.add_model_request_handler(Self::handle_blame_buffer);
+        client.add_model_request_handler(Self::handle_reload_buffers);
     }
 
     /// Creates a buffer store, optionally retaining its buffers.
-    ///
-    /// If `retain_buffers` is `true`, then buffers are owned by the buffer store
-    /// and won't be released unless they are explicitly removed, or `retain_buffers`
-    /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as
-    /// weak handles.
-    pub fn new(
+    pub fn local(worktree_store: Model<WorktreeStore>, cx: &mut ModelContext<Self>) -> Self {
+        cx.subscribe(&worktree_store, |this, _, event, cx| {
+            if let WorktreeStoreEvent::WorktreeAdded(worktree) = event {
+                this.subscribe_to_worktree(worktree, cx);
+            }
+        })
+        .detach();
+
+        Self {
+            state: BufferStoreState::Local {},
+            downstream_client: None,
+            worktree_store,
+            opened_buffers: Default::default(),
+            remote_buffer_listeners: Default::default(),
+            loading_remote_buffers_by_id: Default::default(),
+            local_buffer_ids_by_path: Default::default(),
+            local_buffer_ids_by_entry_id: Default::default(),
+            loading_buffers_by_path: Default::default(),
+            shared_buffers: Default::default(),
+        }
+    }
+
+    pub fn remote(
         worktree_store: Model<WorktreeStore>,
-        remote_id: Option<u64>,
+        upstream_client: AnyProtoClient,
+        remote_id: u64,
         cx: &mut ModelContext<Self>,
     ) -> Self {
         cx.subscribe(&worktree_store, |this, _, event, cx| {
@@ -96,7 +125,11 @@ impl BufferStore {
         .detach();
 
         Self {
-            remote_id,
+            state: BufferStoreState::Remote {
+                shared_with_me: Default::default(),
+                upstream_client,
+                project_id: remote_id,
+            },
             downstream_client: None,
             worktree_store,
             opened_buffers: Default::default(),
@@ -288,16 +321,14 @@ impl BufferStore {
                         buffer.set_diff_base(diff_base.clone(), cx);
                         buffer.remote_id().to_proto()
                     });
-                    if let Some(project_id) = this.remote_id {
-                        if let Some(client) = &this.downstream_client {
-                            client
-                                .send(proto::UpdateDiffBase {
-                                    project_id,
-                                    buffer_id,
-                                    diff_base,
-                                })
-                                .log_err();
-                        }
+                    if let Some((client, project_id)) = &this.downstream_client {
+                        client
+                            .send(proto::UpdateDiffBase {
+                                project_id: *project_id,
+                                buffer_id,
+                                diff_base,
+                            })
+                            .log_err();
                     }
                 }
             })
@@ -496,8 +527,8 @@ impl BufferStore {
             let new_file = save.await?;
             let mtime = new_file.mtime;
             this.update(&mut cx, |this, cx| {
-                if let Some(downstream_client) = this.downstream_client.as_ref() {
-                    let project_id = this.remote_id.unwrap_or(0);
+                if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() {
+                    let project_id = *project_id;
                     if has_changed_file {
                         downstream_client
                             .send(proto::UpdateBufferFile {
@@ -620,11 +651,7 @@ impl BufferStore {
     fn add_buffer(&mut self, buffer: Model<Buffer>, cx: &mut ModelContext<Self>) -> Result<()> {
         let remote_id = buffer.read(cx).remote_id();
         let is_remote = buffer.read(cx).replica_id() != 0;
-        let open_buffer = if self.remote_id.is_some() {
-            OpenBuffer::Strong(buffer.clone())
-        } else {
-            OpenBuffer::Weak(buffer.downgrade())
-        };
+        let open_buffer = OpenBuffer::Buffer(buffer.downgrade());
 
         let handle = cx.handle().downgrade();
         buffer.update(cx, move |_, cx| {
@@ -768,8 +795,7 @@ impl BufferStore {
     }
 
     pub fn disconnected_from_host(&mut self, cx: &mut AppContext) {
-        self.downstream_client.take();
-        self.set_remote_id(None, cx);
+        self.drop_unnecessary_buffers(cx);
 
         for buffer in self.buffers() {
             buffer.update(cx, |buffer, cx| {
@@ -786,32 +812,20 @@ impl BufferStore {
         &mut self,
         remote_id: u64,
         downstream_client: AnyProtoClient,
-        cx: &mut AppContext,
+        _cx: &mut AppContext,
     ) {
-        self.downstream_client = Some(downstream_client);
-        self.set_remote_id(Some(remote_id), cx);
+        self.downstream_client = Some((downstream_client, remote_id));
     }
 
     pub fn unshared(&mut self, _cx: &mut ModelContext<Self>) {
-        self.remote_id.take();
+        self.downstream_client.take();
+        self.forget_shared_buffers();
     }
 
-    fn set_remote_id(&mut self, remote_id: Option<u64>, cx: &mut AppContext) {
-        self.remote_id = remote_id;
+    fn drop_unnecessary_buffers(&mut self, cx: &mut AppContext) {
         for open_buffer in self.opened_buffers.values_mut() {
-            if remote_id.is_some() {
-                if let OpenBuffer::Weak(buffer) = open_buffer {
-                    if let Some(buffer) = buffer.upgrade() {
-                        *open_buffer = OpenBuffer::Strong(buffer);
-                    }
-                }
-            } else {
-                if let Some(buffer) = open_buffer.upgrade() {
-                    buffer.update(cx, |buffer, _| buffer.give_up_waiting());
-                }
-                if let OpenBuffer::Strong(buffer) = open_buffer {
-                    *open_buffer = OpenBuffer::Weak(buffer.downgrade());
-                }
+            if let Some(buffer) = open_buffer.upgrade() {
+                buffer.update(cx, |buffer, _| buffer.give_up_waiting());
             }
         }
     }
@@ -881,8 +895,26 @@ impl BufferStore {
         event: &BufferEvent,
         cx: &mut ModelContext<Self>,
     ) {
-        if event == &BufferEvent::FileHandleChanged {
-            self.buffer_changed_file(buffer, cx);
+        match event {
+            BufferEvent::FileHandleChanged => {
+                self.buffer_changed_file(buffer, cx);
+            }
+            BufferEvent::Reloaded => {
+                let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else {
+                    return;
+                };
+                let buffer = buffer.read(cx);
+                downstream_client
+                    .send(proto::BufferReloaded {
+                        project_id: *project_id,
+                        buffer_id: buffer.remote_id().to_proto(),
+                        version: serialize_version(&buffer.version()),
+                        mtime: buffer.saved_mtime().map(|t| t.into()),
+                        line_ending: serialize_line_ending(buffer.line_ending()) as i32,
+                    })
+                    .log_err();
+            }
+            _ => {}
         }
     }
 
@@ -986,16 +1018,14 @@ impl BufferStore {
                 }
             }
 
-            if let Some(project_id) = self.remote_id {
-                if let Some(client) = &self.downstream_client {
-                    client
-                        .send(proto::UpdateBufferFile {
-                            project_id,
-                            buffer_id: buffer_id.to_proto(),
-                            file: Some(new_file.to_proto(cx)),
-                        })
-                        .ok();
-                }
+            if let Some((client, project_id)) = &self.downstream_client {
+                client
+                    .send(proto::UpdateBufferFile {
+                        project_id: *project_id,
+                        buffer_id: buffer_id.to_proto(),
+                        file: Some(new_file.to_proto(cx)),
+                    })
+                    .ok();
             }
 
             buffer.file_updated(Arc::new(new_file), cx);
@@ -1050,11 +1080,8 @@ impl BufferStore {
         this.update(&mut cx, |this, cx| {
             match this.opened_buffers.entry(buffer_id) {
                 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
-                    OpenBuffer::Strong(buffer) => {
-                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
-                    }
                     OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
-                    OpenBuffer::Weak(buffer) => {
+                    OpenBuffer::Buffer(buffer) => {
                         if let Some(buffer) = buffer.upgrade() {
                             buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx));
                         }
@@ -1090,7 +1117,7 @@ impl BufferStore {
                 self.shared_buffers
                     .entry(guest_id)
                     .or_default()
-                    .insert(buffer_id);
+                    .insert(buffer.clone());
 
                 let buffer = buffer.read(cx);
                 response.buffers.push(proto::BufferVersion {
@@ -1230,6 +1257,19 @@ impl BufferStore {
                     }
                 } else if chunk.is_last {
                     self.loading_remote_buffers_by_id.remove(&buffer_id);
+                    // retain buffers sent by peers to avoid races.
+                    match &mut self.state {
+                        BufferStoreState::Remote {
+                            ref mut shared_with_me,
+                            upstream_client,
+                            ..
+                        } => {
+                            if upstream_client.is_via_collab() {
+                                shared_with_me.insert(buffer.clone());
+                            }
+                        }
+                        _ => {}
+                    }
                     self.add_buffer(buffer, cx)?;
                 }
             }
@@ -1303,7 +1343,10 @@ impl BufferStore {
         let (buffer, project_id) = this.update(&mut cx, |this, _| {
             anyhow::Ok((
                 this.get_existing(buffer_id)?,
-                this.remote_id.context("project is not shared")?,
+                this.downstream_client
+                    .as_ref()
+                    .map(|(_, project_id)| *project_id)
+                    .context("project is not shared")?,
             ))
         })??;
         buffer
@@ -1340,12 +1383,14 @@ impl BufferStore {
         let peer_id = envelope.sender_id;
         let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
         this.update(&mut cx, |this, _| {
-            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
-                if shared.remove(&buffer_id) {
-                    if shared.is_empty() {
-                        this.shared_buffers.remove(&peer_id);
+            if let Some(buffer) = this.get(buffer_id) {
+                if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
+                    if shared.remove(&buffer) {
+                        if shared.is_empty() {
+                            this.shared_buffers.remove(&peer_id);
+                        }
+                        return;
                     }
-                    return;
                 }
             };
             debug_panic!(
@@ -1429,6 +1474,98 @@ impl BufferStore {
         }
     }
 
+    pub fn reload_buffers(
+        &self,
+        buffers: HashSet<Model<Buffer>>,
+        push_to_history: bool,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<ProjectTransaction>> {
+        let mut local_buffers = Vec::new();
+        let mut remote_buffers = Vec::new();
+        for buffer_handle in buffers {
+            let buffer = buffer_handle.read(cx);
+            if buffer.is_dirty() {
+                if let Some(file) = File::from_dyn(buffer.file()) {
+                    if file.is_local() {
+                        local_buffers.push(buffer_handle);
+                    } else {
+                        remote_buffers.push(buffer_handle);
+                    }
+                }
+            }
+        }
+
+        let client = self.upstream_client();
+
+        cx.spawn(move |this, mut cx| async move {
+            let mut project_transaction = ProjectTransaction::default();
+            if let Some((client, project_id)) = client {
+                let response = client
+                    .request(proto::ReloadBuffers {
+                        project_id,
+                        buffer_ids: remote_buffers
+                            .iter()
+                            .filter_map(|buffer| {
+                                buffer
+                                    .update(&mut cx, |buffer, _| buffer.remote_id().into())
+                                    .ok()
+                            })
+                            .collect(),
+                    })
+                    .await?
+                    .transaction
+                    .ok_or_else(|| anyhow!("missing transaction"))?;
+                BufferStore::deserialize_project_transaction(
+                    this,
+                    response,
+                    push_to_history,
+                    cx.clone(),
+                )
+                .await?;
+            }
+
+            for buffer in local_buffers {
+                let transaction = buffer
+                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
+                    .await?;
+                buffer.update(&mut cx, |buffer, cx| {
+                    if let Some(transaction) = transaction {
+                        if !push_to_history {
+                            buffer.forget_transaction(transaction.id);
+                        }
+                        project_transaction.0.insert(cx.handle(), transaction);
+                    }
+                })?;
+            }
+
+            Ok(project_transaction)
+        })
+    }
+
+    async fn handle_reload_buffers(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::ReloadBuffers>,
+        mut cx: AsyncAppContext,
+    ) -> Result<proto::ReloadBuffersResponse> {
+        let sender_id = envelope.original_sender_id().unwrap_or_default();
+        let reload = this.update(&mut cx, |this, cx| {
+            let mut buffers = HashSet::default();
+            for buffer_id in &envelope.payload.buffer_ids {
+                let buffer_id = BufferId::new(*buffer_id)?;
+                buffers.insert(this.get_existing(buffer_id)?);
+            }
+            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
+        })??;
+
+        let project_transaction = reload.await?;
+        let project_transaction = this.update(&mut cx, |this, cx| {
+            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
+        })?;
+        Ok(proto::ReloadBuffersResponse {
+            transaction: Some(project_transaction),
+        })
+    }
+
     pub fn create_buffer_for_peer(
         &mut self,
         buffer: &Model<Buffer>,
@@ -1440,12 +1577,12 @@ impl BufferStore {
             .shared_buffers
             .entry(peer_id)
             .or_default()
-            .insert(buffer_id)
+            .insert(buffer.clone())
         {
             return Task::ready(Ok(()));
         }
 
-        let Some((client, project_id)) = self.downstream_client.clone().zip(self.remote_id) else {
+        let Some((client, project_id)) = self.downstream_client.clone() else {
             return Task::ready(Ok(()));
         };
 
@@ -1492,6 +1629,17 @@ impl BufferStore {
         })
     }
 
+    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
+        match &self.state {
+            BufferStoreState::Remote {
+                upstream_client,
+                project_id,
+                ..
+            } => Some((upstream_client.clone(), *project_id)),
+            BufferStoreState::Local { .. } => None,
+        }
+    }
+
     pub fn forget_shared_buffers(&mut self) {
         self.shared_buffers.clear();
     }
@@ -1506,7 +1654,7 @@ impl BufferStore {
         }
     }
 
-    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<BufferId>> {
+    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<Model<Buffer>>> {
         &self.shared_buffers
     }
 
@@ -1572,8 +1720,7 @@ impl BufferStore {
 impl OpenBuffer {
     fn upgrade(&self) -> Option<Model<Buffer>> {
         match self {
-            OpenBuffer::Strong(handle) => Some(handle.clone()),
-            OpenBuffer::Weak(handle) => handle.upgrade(),
+            OpenBuffer::Buffer(handle) => handle.upgrade(),
             OpenBuffer::Operations(_) => None,
         }
     }

crates/project/src/project.rs 🔗

@@ -42,10 +42,7 @@ use gpui::{
 use itertools::Itertools;
 use language::{
     language_settings::InlayHintKind,
-    proto::{
-        deserialize_anchor, serialize_anchor, serialize_line_ending, serialize_version,
-        split_operations,
-    },
+    proto::{deserialize_anchor, serialize_anchor, split_operations},
     Buffer, BufferEvent, CachedLspAdapter, Capability, CodeLabel, ContextProvider, DiagnosticEntry,
     Documentation, File as _, Language, LanguageRegistry, LanguageServerName, PointUtf16, ToOffset,
     ToPointUtf16, Transaction, Unclipped,
@@ -559,7 +556,6 @@ impl Project {
         client.add_model_message_handler(Self::handle_unshare_project);
         client.add_model_request_handler(Self::handle_update_buffer);
         client.add_model_message_handler(Self::handle_update_worktree);
-        client.add_model_request_handler(Self::handle_reload_buffers);
         client.add_model_request_handler(Self::handle_synchronize_buffers);
 
         client.add_model_request_handler(Self::handle_search_project);
@@ -599,8 +595,7 @@ impl Project {
             cx.subscribe(&worktree_store, Self::on_worktree_store_event)
                 .detach();
 
-            let buffer_store =
-                cx.new_model(|cx| BufferStore::new(worktree_store.clone(), None, cx));
+            let buffer_store = cx.new_model(|cx| BufferStore::local(worktree_store.clone(), cx));
             cx.subscribe(&buffer_store, Self::on_buffer_store_event)
                 .detach();
 
@@ -695,8 +690,14 @@ impl Project {
             cx.subscribe(&worktree_store, Self::on_worktree_store_event)
                 .detach();
 
-            let buffer_store =
-                cx.new_model(|cx| BufferStore::new(worktree_store.clone(), None, cx));
+            let buffer_store = cx.new_model(|cx| {
+                BufferStore::remote(
+                    worktree_store.clone(),
+                    ssh.clone().into(),
+                    SSH_PROJECT_ID,
+                    cx,
+                )
+            });
             cx.subscribe(&buffer_store, Self::on_buffer_store_event)
                 .detach();
 
@@ -851,8 +852,9 @@ impl Project {
                     .map(DevServerProjectId),
             )
         })?;
-        let buffer_store =
-            cx.new_model(|cx| BufferStore::new(worktree_store.clone(), Some(remote_id), cx))?;
+        let buffer_store = cx.new_model(|cx| {
+            BufferStore::remote(worktree_store.clone(), client.clone().into(), remote_id, cx)
+        })?;
 
         let lsp_store = cx.new_model(|cx| {
             let mut lsp_store = LspStore::new_remote(
@@ -2167,23 +2169,6 @@ impl Project {
                 .ok();
             }
 
-            BufferEvent::Reloaded => {
-                if self.is_local_or_ssh() {
-                    if let Some(project_id) = self.remote_id() {
-                        let buffer = buffer.read(cx);
-                        self.client
-                            .send(proto::BufferReloaded {
-                                project_id,
-                                buffer_id: buffer.remote_id().to_proto(),
-                                version: serialize_version(&buffer.version()),
-                                mtime: buffer.saved_mtime().map(|t| t.into()),
-                                line_ending: serialize_line_ending(buffer.line_ending()) as i32,
-                            })
-                            .log_err();
-                    }
-                }
-            }
-
             _ => {}
         }
 
@@ -2347,67 +2332,8 @@ impl Project {
         push_to_history: bool,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ProjectTransaction>> {
-        let mut local_buffers = Vec::new();
-        let mut remote_buffers = None;
-        for buffer_handle in buffers {
-            let buffer = buffer_handle.read(cx);
-            if buffer.is_dirty() {
-                if let Some(file) = File::from_dyn(buffer.file()) {
-                    if file.is_local() {
-                        local_buffers.push(buffer_handle);
-                    } else {
-                        remote_buffers.get_or_insert(Vec::new()).push(buffer_handle);
-                    }
-                }
-            }
-        }
-
-        let remote_buffers = self.remote_id().zip(remote_buffers);
-        let client = self.client.clone();
-
-        cx.spawn(move |this, mut cx| async move {
-            let mut project_transaction = ProjectTransaction::default();
-
-            if let Some((project_id, remote_buffers)) = remote_buffers {
-                let response = client
-                    .request(proto::ReloadBuffers {
-                        project_id,
-                        buffer_ids: remote_buffers
-                            .iter()
-                            .filter_map(|buffer| {
-                                buffer
-                                    .update(&mut cx, |buffer, _| buffer.remote_id().into())
-                                    .ok()
-                            })
-                            .collect(),
-                    })
-                    .await?
-                    .transaction
-                    .ok_or_else(|| anyhow!("missing transaction"))?;
-                BufferStore::deserialize_project_transaction(
-                    this.read_with(&cx, |this, _| this.buffer_store.downgrade())?,
-                    response,
-                    push_to_history,
-                    cx.clone(),
-                )
-                .await?;
-            }
-
-            for buffer in local_buffers {
-                let transaction = buffer
-                    .update(&mut cx, |buffer, cx| buffer.reload(cx))?
-                    .await?;
-                buffer.update(&mut cx, |buffer, cx| {
-                    if let Some(transaction) = transaction {
-                        if !push_to_history {
-                            buffer.forget_transaction(transaction.id);
-                        }
-                        project_transaction.0.insert(cx.handle(), transaction);
-                    }
-                })?;
-            }
-
-            Ok(project_transaction)
+        self.buffer_store.update(cx, |buffer_store, cx| {
+            buffer_store.reload_buffers(buffers, push_to_history, cx)
         })
     }
 
@@ -3589,30 +3515,6 @@ impl Project {
         })?
     }
 
-    async fn handle_reload_buffers(
-        this: Model<Self>,
-        envelope: TypedEnvelope<proto::ReloadBuffers>,
-        mut cx: AsyncAppContext,
-    ) -> Result<proto::ReloadBuffersResponse> {
-        let sender_id = envelope.original_sender_id()?;
-        let reload = this.update(&mut cx, |this, cx| {
-            let mut buffers = HashSet::default();
-            for buffer_id in &envelope.payload.buffer_ids {
-                let buffer_id = BufferId::new(*buffer_id)?;
-                buffers.insert(this.buffer_store.read(cx).get_existing(buffer_id)?);
-            }
-            Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx))
-        })??;
-
-        let project_transaction = reload.await?;
-        let project_transaction = this.update(&mut cx, |this, cx| {
-            this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx)
-        })?;
-        Ok(proto::ReloadBuffersResponse {
-            transaction: Some(project_transaction),
-        })
-    }
-
     async fn handle_synchronize_buffers(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::SynchronizeBuffers>,
@@ -3896,17 +3798,6 @@ impl Project {
         })?
     }
 
-    fn serialize_project_transaction_for_peer(
-        &mut self,
-        project_transaction: ProjectTransaction,
-        peer_id: proto::PeerId,
-        cx: &mut AppContext,
-    ) -> proto::ProjectTransaction {
-        self.buffer_store.update(cx, |buffer_store, cx| {
-            buffer_store.serialize_project_transaction_for_peer(project_transaction, peer_id, cx)
-        })
-    }
-
     fn create_buffer_for_peer(
         &mut self,
         buffer: &Model<Buffer>,

crates/remote_server/src/headless_project.rs 🔗

@@ -50,8 +50,7 @@ impl HeadlessProject {
             store
         });
         let buffer_store = cx.new_model(|cx| {
-            let mut buffer_store =
-                BufferStore::new(worktree_store.clone(), Some(SSH_PROJECT_ID), cx);
+            let mut buffer_store = BufferStore::local(worktree_store.clone(), cx);
             buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
             buffer_store
         });

crates/remote_server/src/remote_editing_tests.rs 🔗

@@ -7,6 +7,7 @@ use http_client::FakeHttpClient;
 use language::{
     language_settings::{all_language_settings, AllLanguageSettings},
     Buffer, FakeLspAdapter, LanguageConfig, LanguageMatcher, LanguageRegistry, LanguageServerName,
+    LineEnding,
 };
 use lsp::{CompletionContext, CompletionResponse, CompletionTriggerKind};
 use node_runtime::NodeRuntime;
@@ -18,7 +19,10 @@ use remote::SshSession;
 use serde_json::json;
 use settings::{Settings, SettingsLocation, SettingsStore};
 use smol::stream::StreamExt;
-use std::{path::Path, sync::Arc};
+use std::{
+    path::{Path, PathBuf},
+    sync::Arc,
+};
 
 #[gpui::test]
 async fn test_basic_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
@@ -440,6 +444,54 @@ async fn test_remote_lsp(cx: &mut TestAppContext, server_cx: &mut TestAppContext
     })
 }
 
+#[gpui::test]
+async fn test_remote_reload(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
+    let (project, _headless, fs) = init_test(cx, server_cx).await;
+    let (worktree, _) = project
+        .update(cx, |project, cx| {
+            project.find_or_create_worktree("/code/project1", true, cx)
+        })
+        .await
+        .unwrap();
+
+    let worktree_id = cx.update(|cx| worktree.read(cx).id());
+
+    let buffer = project
+        .update(cx, |project, cx| {
+            project.open_buffer((worktree_id, Path::new("src/lib.rs")), cx)
+        })
+        .await
+        .unwrap();
+    buffer.update(cx, |buffer, cx| {
+        buffer.edit([(0..0, "a")], None, cx);
+    });
+
+    fs.save(
+        &PathBuf::from("/code/project1/src/lib.rs"),
+        &("bloop".to_string().into()),
+        LineEnding::Unix,
+    )
+    .await
+    .unwrap();
+
+    cx.run_until_parked();
+    cx.update(|cx| {
+        assert!(buffer.read(cx).has_conflict());
+    });
+
+    project
+        .update(cx, |project, cx| {
+            project.reload_buffers([buffer.clone()].into_iter().collect(), false, cx)
+        })
+        .await
+        .unwrap();
+    cx.run_until_parked();
+
+    cx.update(|cx| {
+        assert!(!buffer.read(cx).has_conflict());
+    });
+}
+
 #[gpui::test]
 async fn test_remote_resolve_file_path(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
     let (project, _headless, _fs) = init_test(cx, server_cx).await;
@@ -483,6 +535,34 @@ async fn test_remote_resolve_file_path(cx: &mut TestAppContext, server_cx: &mut
     );
 }
 
+#[gpui::test(iterations = 10)]
+async fn test_canceling_buffer_opening(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
+    let (project, _headless, _fs) = init_test(cx, server_cx).await;
+    let (worktree, _) = project
+        .update(cx, |project, cx| {
+            project.find_or_create_worktree("/code/project1", true, cx)
+        })
+        .await
+        .unwrap();
+    let worktree_id = worktree.read_with(cx, |tree, _| tree.id());
+
+    // Open a buffer on the client but cancel after a random amount of time.
+    let buffer = project.update(cx, |p, cx| p.open_buffer((worktree_id, "src/lib.rs"), cx));
+    cx.executor().simulate_random_delay().await;
+    drop(buffer);
+
+    // Try opening the same buffer again as the client, and ensure we can
+    // still do it despite the cancellation above.
+    let buffer = project
+        .update(cx, |p, cx| p.open_buffer((worktree_id, "src/lib.rs"), cx))
+        .await
+        .unwrap();
+
+    buffer.read_with(cx, |buf, _| {
+        assert_eq!(buf.text(), "fn one() -> usize { 1 }")
+    });
+}
+
 fn init_logger() {
     if std::env::var("RUST_LOG").is_ok() {
         env_logger::try_init().ok();