Move shared_buffers into BufferStore (#17020)

Conrad Irwin created

This also updates the SSH protocol (but not yet collab) to more closely
track which buffers are open on the client.

Release Notes:

- N/A

Change summary

crates/project/src/buffer_store.rs               | 256 ++++++++++++++---
crates/project/src/project.rs                    | 186 +++---------
crates/proto/proto/zed.proto                     |   9 
crates/proto/src/proto.rs                        |   3 
crates/remote_server/src/headless_project.rs     |  40 +-
crates/remote_server/src/remote_editing_tests.rs |  70 +++-
6 files changed, 333 insertions(+), 231 deletions(-)

Detailed changes

crates/project/src/buffer_store.rs 🔗

@@ -4,6 +4,7 @@ use crate::{
     Item, NoRepositoryError, ProjectPath,
 };
 use anyhow::{anyhow, Context as _, Result};
+use client::Client;
 use collections::{hash_map, HashMap, HashSet};
 use fs::Fs;
 use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
@@ -17,13 +18,13 @@ use language::{
     Buffer, Capability, Event as BufferEvent, File as _, Language, Operation,
 };
 use rpc::{
-    proto::{self, AnyProtoClient, EnvelopedMessage, PeerId},
+    proto::{self, AnyProtoClient, EnvelopedMessage},
     ErrorExt as _, TypedEnvelope,
 };
 use smol::channel::Receiver;
 use std::{io, path::Path, str::FromStr as _, sync::Arc};
 use text::BufferId;
-use util::{debug_panic, maybe, ResultExt as _};
+use util::{debug_panic, maybe, ResultExt as _, TryFutureExt};
 use worktree::{
     File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree,
     WorktreeId,
@@ -45,6 +46,7 @@ pub struct BufferStore {
     loading_remote_buffers_by_id: HashMap<BufferId, Model<Buffer>>,
     remote_buffer_listeners:
         HashMap<BufferId, Vec<oneshot::Sender<Result<Model<Buffer>, anyhow::Error>>>>,
+    shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
 }
 
 enum OpenBuffer {
@@ -55,6 +57,7 @@ enum OpenBuffer {
 
 pub enum BufferStoreEvent {
     BufferAdded(Model<Buffer>),
+    BufferDropped(BufferId),
     BufferChangedFilePath {
         buffer: Model<Buffer>,
         old_file: Option<Arc<dyn language::File>>,
@@ -93,6 +96,7 @@ impl BufferStore {
             local_buffer_ids_by_path: Default::default(),
             local_buffer_ids_by_entry_id: Default::default(),
             loading_buffers_by_path: Default::default(),
+            shared_buffers: Default::default(),
         }
     }
 
@@ -613,6 +617,18 @@ impl BufferStore {
             OpenBuffer::Weak(buffer.downgrade())
         };
 
+        let handle = cx.handle().downgrade();
+        buffer.update(cx, move |_, cx| {
+            cx.on_release(move |buffer, cx| {
+                handle
+                    .update(cx, |_, cx| {
+                        cx.emit(BufferStoreEvent::BufferDropped(buffer.remote_id()))
+                    })
+                    .ok();
+            })
+            .detach()
+        });
+
         match self.opened_buffers.entry(remote_id) {
             hash_map::Entry::Vacant(entry) => {
                 entry.insert(open_buffer);
@@ -997,55 +1013,6 @@ impl BufferStore {
         Some(())
     }
 
-    pub async fn create_buffer_for_peer(
-        this: Model<Self>,
-        peer_id: PeerId,
-        buffer_id: BufferId,
-        project_id: u64,
-        client: AnyProtoClient,
-        cx: &mut AsyncAppContext,
-    ) -> Result<()> {
-        let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else {
-            return Ok(());
-        };
-
-        let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?;
-        let operations = operations.await;
-        let state = buffer.update(cx, |buffer, cx| buffer.to_proto(cx))?;
-
-        let initial_state = proto::CreateBufferForPeer {
-            project_id,
-            peer_id: Some(peer_id),
-            variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
-        };
-
-        if client.send(initial_state).log_err().is_some() {
-            let client = client.clone();
-            cx.background_executor()
-                .spawn(async move {
-                    let mut chunks = split_operations(operations).peekable();
-                    while let Some(chunk) = chunks.next() {
-                        let is_last = chunks.peek().is_none();
-                        client.send(proto::CreateBufferForPeer {
-                            project_id,
-                            peer_id: Some(peer_id),
-                            variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
-                                proto::BufferChunk {
-                                    buffer_id: buffer_id.into(),
-                                    operations: chunk,
-                                    is_last,
-                                },
-                            )),
-                        })?;
-                    }
-                    anyhow::Ok(())
-                })
-                .await
-                .log_err();
-        }
-        Ok(())
-    }
-
     pub async fn handle_update_buffer(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::UpdateBuffer>,
@@ -1075,6 +1042,90 @@ impl BufferStore {
         })?
     }
 
+    pub fn handle_synchronize_buffers(
+        &mut self,
+        envelope: TypedEnvelope<proto::SynchronizeBuffers>,
+        cx: &mut ModelContext<Self>,
+        client: Arc<Client>,
+    ) -> Result<proto::SynchronizeBuffersResponse> {
+        let project_id = envelope.payload.project_id;
+        let mut response = proto::SynchronizeBuffersResponse {
+            buffers: Default::default(),
+        };
+        let Some(guest_id) = envelope.original_sender_id else {
+            anyhow::bail!("missing original_sender_id on SynchronizeBuffers request");
+        };
+
+        self.shared_buffers.entry(guest_id).or_default().clear();
+        for buffer in envelope.payload.buffers {
+            let buffer_id = BufferId::new(buffer.id)?;
+            let remote_version = language::proto::deserialize_version(&buffer.version);
+            if let Some(buffer) = self.get(buffer_id) {
+                self.shared_buffers
+                    .entry(guest_id)
+                    .or_default()
+                    .insert(buffer_id);
+
+                let buffer = buffer.read(cx);
+                response.buffers.push(proto::BufferVersion {
+                    id: buffer_id.into(),
+                    version: language::proto::serialize_version(&buffer.version),
+                });
+
+                let operations = buffer.serialize_ops(Some(remote_version), cx);
+                let client = client.clone();
+                if let Some(file) = buffer.file() {
+                    client
+                        .send(proto::UpdateBufferFile {
+                            project_id,
+                            buffer_id: buffer_id.into(),
+                            file: Some(file.to_proto(cx)),
+                        })
+                        .log_err();
+                }
+
+                client
+                    .send(proto::UpdateDiffBase {
+                        project_id,
+                        buffer_id: buffer_id.into(),
+                        diff_base: buffer.diff_base().map(ToString::to_string),
+                    })
+                    .log_err();
+
+                client
+                    .send(proto::BufferReloaded {
+                        project_id,
+                        buffer_id: buffer_id.into(),
+                        version: language::proto::serialize_version(buffer.saved_version()),
+                        mtime: buffer.saved_mtime().map(|time| time.into()),
+                        line_ending: language::proto::serialize_line_ending(buffer.line_ending())
+                            as i32,
+                    })
+                    .log_err();
+
+                cx.background_executor()
+                    .spawn(
+                        async move {
+                            let operations = operations.await;
+                            for chunk in split_operations(operations) {
+                                client
+                                    .request(proto::UpdateBuffer {
+                                        project_id,
+                                        buffer_id: buffer_id.into(),
+                                        operations: chunk,
+                                    })
+                                    .await?;
+                            }
+                            anyhow::Ok(())
+                        }
+                        .log_err(),
+                    )
+                    .detach();
+            }
+        }
+        Ok(response)
+    }
+
     pub fn handle_create_buffer_for_peer(
         &mut self,
         envelope: TypedEnvelope<proto::CreateBufferForPeer>,
@@ -1254,6 +1305,30 @@ impl BufferStore {
         })
     }
 
+    pub async fn handle_close_buffer(
+        this: Model<Self>,
+        envelope: TypedEnvelope<proto::CloseBuffer>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let peer_id = envelope.sender_id;
+        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
+        this.update(&mut cx, |this, _| {
+            if let Some(shared) = this.shared_buffers.get_mut(&peer_id) {
+                if shared.remove(&buffer_id) {
+                    if shared.is_empty() {
+                        this.shared_buffers.remove(&peer_id);
+                    }
+                    return;
+                }
+            };
+            debug_panic!(
+                "peer_id {} closed buffer_id {} which was either not open or already closed",
+                peer_id,
+                buffer_id
+            )
+        })
+    }
+
     pub async fn handle_buffer_saved(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::BufferSaved>,
@@ -1326,6 +1401,85 @@ impl BufferStore {
             receiver.next().await;
         }
     }
+
+    pub fn create_buffer_for_peer(
+        &mut self,
+        buffer: &Model<Buffer>,
+        peer_id: proto::PeerId,
+        project_id: u64,
+        client: AnyProtoClient,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        let buffer_id = buffer.read(cx).remote_id();
+        if !self
+            .shared_buffers
+            .entry(peer_id)
+            .or_default()
+            .insert(buffer_id)
+        {
+            return Task::ready(Ok(()));
+        }
+
+        cx.spawn(|this, mut cx| async move {
+            let Some(buffer) = this.update(&mut cx, |this, _| this.get(buffer_id))? else {
+                return anyhow::Ok(());
+            };
+
+            let operations = buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?;
+            let operations = operations.await;
+            let state = buffer.update(&mut cx, |buffer, cx| buffer.to_proto(cx))?;
+
+            let initial_state = proto::CreateBufferForPeer {
+                project_id,
+                peer_id: Some(peer_id),
+                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
+            };
+
+            if client.send(initial_state).log_err().is_some() {
+                let client = client.clone();
+                cx.background_executor()
+                    .spawn(async move {
+                        let mut chunks = split_operations(operations).peekable();
+                        while let Some(chunk) = chunks.next() {
+                            let is_last = chunks.peek().is_none();
+                            client.send(proto::CreateBufferForPeer {
+                                project_id,
+                                peer_id: Some(peer_id),
+                                variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
+                                    proto::BufferChunk {
+                                        buffer_id: buffer_id.into(),
+                                        operations: chunk,
+                                        is_last,
+                                    },
+                                )),
+                            })?;
+                        }
+                        anyhow::Ok(())
+                    })
+                    .await
+                    .log_err();
+            }
+            Ok(())
+        })
+    }
+
+    pub fn forget_shared_buffers(&mut self) {
+        self.shared_buffers.clear();
+    }
+
+    pub fn forget_shared_buffers_for(&mut self, peer_id: &proto::PeerId) {
+        self.shared_buffers.remove(peer_id);
+    }
+
+    pub fn update_peer_id(&mut self, old_peer_id: &proto::PeerId, new_peer_id: proto::PeerId) {
+        if let Some(buffers) = self.shared_buffers.remove(old_peer_id) {
+            self.shared_buffers.insert(new_peer_id, buffers);
+        }
+    }
+
+    pub fn shared_buffers(&self) -> &HashMap<proto::PeerId, HashSet<BufferId>> {
+        &self.shared_buffers
+    }
 }
 
 impl OpenBuffer {

crates/project/src/project.rs 🔗

@@ -16,7 +16,7 @@ mod project_tests;
 pub mod search_history;
 mod yarn;
 
-use anyhow::{anyhow, bail, Context as _, Result};
+use anyhow::{anyhow, Context as _, Result};
 use async_trait::async_trait;
 use buffer_store::{BufferStore, BufferStoreEvent};
 use client::{
@@ -208,7 +208,6 @@ pub struct Project {
     worktree_store: Model<WorktreeStore>,
     buffer_store: Model<BufferStore>,
     _subscriptions: Vec<gpui::Subscription>,
-    shared_buffers: HashMap<proto::PeerId, HashSet<BufferId>>,
     #[allow(clippy::type_complexity)]
     loading_worktrees:
         HashMap<Arc<Path>, Shared<Task<Result<Model<Worktree>, Arc<anyhow::Error>>>>>,
@@ -807,7 +806,6 @@ impl Project {
                 collaborators: Default::default(),
                 worktree_store,
                 buffer_store,
-                shared_buffers: Default::default(),
                 loading_worktrees: Default::default(),
                 buffer_snapshots: Default::default(),
                 join_project_response_message_id: 0,
@@ -979,7 +977,6 @@ impl Project {
                 buffer_ordered_messages_tx: tx,
                 buffer_store: buffer_store.clone(),
                 worktree_store,
-                shared_buffers: Default::default(),
                 loading_worktrees: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
@@ -1729,7 +1726,8 @@ impl Project {
         message: proto::ResharedProject,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        self.shared_buffers.clear();
+        self.buffer_store
+            .update(cx, |buffer_store, _| buffer_store.forget_shared_buffers());
         self.set_collaborators_from_proto(message.collaborators, cx)?;
         self.metadata_changed(cx);
         cx.emit(Event::Reshared);
@@ -1799,13 +1797,14 @@ impl Project {
         if let ProjectClientState::Shared { remote_id, .. } = self.client_state {
             self.client_state = ProjectClientState::Local;
             self.collaborators.clear();
-            self.shared_buffers.clear();
             self.client_subscriptions.clear();
             self.worktree_store.update(cx, |store, cx| {
                 store.set_shared(false, cx);
             });
-            self.buffer_store
-                .update(cx, |buffer_store, cx| buffer_store.set_remote_id(None, cx));
+            self.buffer_store.update(cx, |buffer_store, cx| {
+                buffer_store.forget_shared_buffers();
+                buffer_store.set_remote_id(None, cx)
+            });
             self.client
                 .send(proto::UnshareProject {
                     project_id: remote_id,
@@ -2421,6 +2420,16 @@ impl Project {
             BufferStoreEvent::MessageToReplicas(message) => {
                 self.client.send_dynamic(message.as_ref().clone()).log_err();
             }
+            BufferStoreEvent::BufferDropped(buffer_id) => {
+                if let Some(ref ssh_session) = self.ssh_session {
+                    ssh_session
+                        .send(proto::CloseBuffer {
+                            project_id: 0,
+                            buffer_id: buffer_id.to_proto(),
+                        })
+                        .log_err();
+                }
+            }
         }
     }
 
@@ -7317,7 +7326,7 @@ impl Project {
             query: Some(query.to_proto()),
             limit: limit as _,
         });
-        let guard = self.retain_remotely_created_buffers();
+        let guard = self.retain_remotely_created_buffers(cx);
 
         cx.spawn(move |this, mut cx| async move {
             let response = request.await?;
@@ -8543,7 +8552,9 @@ impl Project {
 
         let collaborator = Collaborator::from_proto(collaborator)?;
         this.update(&mut cx, |this, cx| {
-            this.shared_buffers.remove(&collaborator.peer_id);
+            this.buffer_store.update(cx, |buffer_store, _| {
+                buffer_store.forget_shared_buffers_for(&collaborator.peer_id);
+            });
             cx.emit(Event::CollaboratorJoined(collaborator.peer_id));
             this.collaborators
                 .insert(collaborator.peer_id, collaborator);
@@ -8574,16 +8585,10 @@ impl Project {
             let is_host = collaborator.replica_id == 0;
             this.collaborators.insert(new_peer_id, collaborator);
 
-            let buffers = this.shared_buffers.remove(&old_peer_id);
-            log::info!(
-                "peer {} became {}. moving buffers {:?}",
-                old_peer_id,
-                new_peer_id,
-                &buffers
-            );
-            if let Some(buffers) = buffers {
-                this.shared_buffers.insert(new_peer_id, buffers);
-            }
+            log::info!("peer {} became {}", old_peer_id, new_peer_id,);
+            this.buffer_store.update(cx, |buffer_store, _| {
+                buffer_store.update_peer_id(&old_peer_id, new_peer_id)
+            });
 
             if is_host {
                 this.buffer_store
@@ -8618,11 +8623,11 @@ impl Project {
                 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
                 .replica_id;
             this.buffer_store.update(cx, |buffer_store, cx| {
+                buffer_store.forget_shared_buffers_for(&peer_id);
                 for buffer in buffer_store.buffers() {
                     buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
                 }
             });
-            this.shared_buffers.remove(&peer_id);
 
             cx.emit(Event::CollaboratorLeft(peer_id));
             cx.notify();
@@ -8835,8 +8840,17 @@ impl Project {
         BufferStore::handle_update_buffer(buffer_store, envelope, cx).await
     }
 
-    fn retain_remotely_created_buffers(&mut self) -> RemotelyCreatedBufferGuard {
-        self.remotely_created_buffers.lock().retain_count += 1;
+    fn retain_remotely_created_buffers(
+        &mut self,
+        cx: &mut ModelContext<Self>,
+    ) -> RemotelyCreatedBufferGuard {
+        {
+            let mut remotely_created_buffers = self.remotely_created_buffers.lock();
+            if remotely_created_buffers.retain_count == 0 {
+                remotely_created_buffers.buffers = self.buffer_store.read(cx).buffers().collect();
+            }
+            remotely_created_buffers.retain_count += 1;
+        }
         RemotelyCreatedBufferGuard {
             remote_buffers: Arc::downgrade(&self.remotely_created_buffers),
         }
@@ -8888,86 +8902,11 @@ impl Project {
         envelope: TypedEnvelope<proto::SynchronizeBuffers>,
         mut cx: AsyncAppContext,
     ) -> Result<proto::SynchronizeBuffersResponse> {
-        let project_id = envelope.payload.project_id;
-        let mut response = proto::SynchronizeBuffersResponse {
-            buffers: Default::default(),
-        };
-
-        this.update(&mut cx, |this, cx| {
-            let Some(guest_id) = envelope.original_sender_id else {
-                error!("missing original_sender_id on SynchronizeBuffers request");
-                bail!("missing original_sender_id on SynchronizeBuffers request");
-            };
-
-            this.shared_buffers.entry(guest_id).or_default().clear();
-            for buffer in envelope.payload.buffers {
-                let buffer_id = BufferId::new(buffer.id)?;
-                let remote_version = language::proto::deserialize_version(&buffer.version);
-                if let Some(buffer) = this.buffer_for_id(buffer_id, cx) {
-                    this.shared_buffers
-                        .entry(guest_id)
-                        .or_default()
-                        .insert(buffer_id);
-
-                    let buffer = buffer.read(cx);
-                    response.buffers.push(proto::BufferVersion {
-                        id: buffer_id.into(),
-                        version: language::proto::serialize_version(&buffer.version),
-                    });
-
-                    let operations = buffer.serialize_ops(Some(remote_version), cx);
-                    let client = this.client.clone();
-                    if let Some(file) = buffer.file() {
-                        client
-                            .send(proto::UpdateBufferFile {
-                                project_id,
-                                buffer_id: buffer_id.into(),
-                                file: Some(file.to_proto(cx)),
-                            })
-                            .log_err();
-                    }
-
-                    client
-                        .send(proto::UpdateDiffBase {
-                            project_id,
-                            buffer_id: buffer_id.into(),
-                            diff_base: buffer.diff_base().map(ToString::to_string),
-                        })
-                        .log_err();
-
-                    client
-                        .send(proto::BufferReloaded {
-                            project_id,
-                            buffer_id: buffer_id.into(),
-                            version: language::proto::serialize_version(buffer.saved_version()),
-                            mtime: buffer.saved_mtime().map(|time| time.into()),
-                            line_ending: language::proto::serialize_line_ending(
-                                buffer.line_ending(),
-                            ) as i32,
-                        })
-                        .log_err();
-
-                    cx.background_executor()
-                        .spawn(
-                            async move {
-                                let operations = operations.await;
-                                for chunk in split_operations(operations) {
-                                    client
-                                        .request(proto::UpdateBuffer {
-                                            project_id,
-                                            buffer_id: buffer_id.into(),
-                                            operations: chunk,
-                                        })
-                                        .await?;
-                                }
-                                anyhow::Ok(())
-                            }
-                            .log_err(),
-                        )
-                        .detach();
-                }
-            }
-            Ok(())
+        let response = this.update(&mut cx, |this, cx| {
+            let client = this.client.clone();
+            this.buffer_store.update(cx, |this, cx| {
+                this.handle_synchronize_buffers(envelope, cx, client)
+            })
         })??;
 
         Ok(response)
@@ -9795,35 +9734,20 @@ impl Project {
         peer_id: proto::PeerId,
         cx: &mut AppContext,
     ) -> BufferId {
-        let buffer_id = buffer.read(cx).remote_id();
-        if !self
-            .shared_buffers
-            .entry(peer_id)
-            .or_default()
-            .insert(buffer_id)
-        {
-            return buffer_id;
+        if let Some(project_id) = self.remote_id() {
+            self.buffer_store
+                .update(cx, |buffer_store, cx| {
+                    buffer_store.create_buffer_for_peer(
+                        buffer,
+                        peer_id,
+                        project_id,
+                        self.client.clone().into(),
+                        cx,
+                    )
+                })
+                .detach_and_log_err(cx);
         }
-        let ProjectClientState::Shared { remote_id } = self.client_state else {
-            return buffer_id;
-        };
-        let buffer_store = self.buffer_store.clone();
-        let client = self.client().clone();
-
-        cx.spawn(|mut cx| async move {
-            BufferStore::create_buffer_for_peer(
-                buffer_store,
-                peer_id,
-                buffer_id,
-                remote_id,
-                client.clone().into(),
-                &mut cx,
-            )
-            .await?;
-            anyhow::Ok(())
-        })
-        .detach_and_log_err(cx);
-        buffer_id
+        buffer.read(cx).remote_id()
     }
 
     fn wait_for_remote_buffer(

crates/proto/proto/zed.proto 🔗

@@ -278,7 +278,9 @@ message Envelope {
         LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242;
 
         FindSearchCandidates find_search_candidates = 243;
-        FindSearchCandidatesResponse find_search_candidates_response = 244; // current max
+        FindSearchCandidatesResponse find_search_candidates_response = 244;
+
+        CloseBuffer close_buffer = 245; // current max
     }
 
     reserved 158 to 161;
@@ -870,6 +872,11 @@ message SaveBuffer {
     optional ProjectPath new_path = 4;
 }
 
+message CloseBuffer {
+    uint64 project_id = 1;
+    uint64 buffer_id = 2;
+}
+
 message ProjectPath {
     uint64 worktree_id = 1;
     string path = 2;

crates/proto/src/proto.rs 🔗

@@ -411,7 +411,8 @@ messages!(
     (AddWorktree, Foreground),
     (AddWorktreeResponse, Foreground),
     (FindSearchCandidates, Background),
-    (FindSearchCandidatesResponse, Background)
+    (FindSearchCandidatesResponse, Background),
+    (CloseBuffer, Foreground)
 );
 
 request_messages!(

crates/remote_server/src/headless_project.rs 🔗

@@ -55,6 +55,7 @@ impl HeadlessProject {
         session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_blame_buffer);
         session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_update_buffer);
         session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_save_buffer);
+        session.add_message_handler(buffer_store.downgrade(), BufferStore::handle_close_buffer);
 
         session.add_request_handler(
             worktree_store.downgrade(),
@@ -143,19 +144,11 @@ impl HeadlessProject {
 
         let buffer = buffer.await?;
         let buffer_id = buffer.read_with(&cx, |b, _| b.remote_id())?;
-
-        cx.spawn(|mut cx| async move {
-            BufferStore::create_buffer_for_peer(
-                buffer_store,
-                PEER_ID,
-                buffer_id,
-                PROJECT_ID,
-                session,
-                &mut cx,
-            )
-            .await
-        })
-        .detach();
+        buffer_store.update(&mut cx, |buffer_store, cx| {
+            buffer_store
+                .create_buffer_for_peer(&buffer, PEER_ID, PROJECT_ID, session, cx)
+                .detach_and_log_err(cx);
+        })?;
 
         Ok(proto::OpenBufferResponse {
             buffer_id: buffer_id.to_proto(),
@@ -190,16 +183,17 @@ impl HeadlessProject {
         while let Some(buffer) = results.next().await {
             let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?;
             response.buffer_ids.push(buffer_id.to_proto());
-
-            BufferStore::create_buffer_for_peer(
-                buffer_store.clone(),
-                PEER_ID,
-                buffer_id,
-                PROJECT_ID,
-                client.clone(),
-                &mut cx,
-            )
-            .await?;
+            buffer_store
+                .update(&mut cx, |buffer_store, cx| {
+                    buffer_store.create_buffer_for_peer(
+                        &buffer,
+                        PEER_ID,
+                        PROJECT_ID,
+                        client.clone(),
+                        cx,
+                    )
+                })?
+                .await?;
         }
 
         Ok(response)

crates/remote_server/src/remote_editing_tests.rs 🔗

@@ -4,7 +4,7 @@ use clock::FakeSystemClock;
 use fs::{FakeFs, Fs};
 use gpui::{Context, Model, TestAppContext};
 use http_client::FakeHttpClient;
-use language::LanguageRegistry;
+use language::{Buffer, LanguageRegistry};
 use node_runtime::FakeNodeRuntime;
 use project::{
     search::{SearchQuery, SearchResult},
@@ -121,7 +121,7 @@ async fn test_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppCon
 
 #[gpui::test]
 async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut TestAppContext) {
-    let (project, _, _) = init_test(cx, server_cx).await;
+    let (project, headless, _) = init_test(cx, server_cx).await;
 
     project
         .update(cx, |project, cx| {
@@ -132,33 +132,55 @@ async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut Tes
 
     cx.run_until_parked();
 
-    let mut receiver = project.update(cx, |project, cx| {
-        project.search(
-            SearchQuery::text(
-                "project",
-                false,
-                true,
-                false,
-                Default::default(),
-                Default::default(),
+    async fn do_search(project: &Model<Project>, mut cx: TestAppContext) -> Model<Buffer> {
+        let mut receiver = project.update(&mut cx, |project, cx| {
+            project.search(
+                SearchQuery::text(
+                    "project",
+                    false,
+                    true,
+                    false,
+                    Default::default(),
+                    Default::default(),
+                )
+                .unwrap(),
+                cx,
             )
-            .unwrap(),
-            cx,
-        )
+        });
+
+        let first_response = receiver.next().await.unwrap();
+        let SearchResult::Buffer { buffer, .. } = first_response else {
+            panic!("incorrect result");
+        };
+        buffer.update(&mut cx, |buffer, cx| {
+            assert_eq!(
+                buffer.file().unwrap().full_path(cx).to_string_lossy(),
+                "project1/README.md"
+            )
+        });
+
+        assert!(receiver.next().await.is_none());
+        buffer
+    }
+
+    let buffer = do_search(&project, cx.clone()).await;
+
+    // test that the headless server is tracking which buffers we have open correctly.
+    cx.run_until_parked();
+    headless.update(server_cx, |headless, cx| {
+        assert!(!headless.buffer_store.read(cx).shared_buffers().is_empty())
     });
+    do_search(&project, cx.clone()).await;
 
-    let first_response = receiver.next().await.unwrap();
-    let SearchResult::Buffer { buffer, .. } = first_response else {
-        panic!("incorrect result");
-    };
-    buffer.update(cx, |buffer, cx| {
-        assert_eq!(
-            buffer.file().unwrap().full_path(cx).to_string_lossy(),
-            "project1/README.md"
-        )
+    cx.update(|_| {
+        drop(buffer);
+    });
+    cx.run_until_parked();
+    headless.update(server_cx, |headless, cx| {
+        assert!(headless.buffer_store.read(cx).shared_buffers().is_empty())
     });
 
-    assert!(receiver.next().await.is_none());
+    do_search(&project, cx.clone()).await;
 }
 
 fn init_logger() {