Never close buffers when sharing

Max Brunsfeld and Antonio Scandurra created

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

crates/project/src/lsp_command.rs |  23 -
crates/project/src/project.rs     | 364 +++++++++++---------------------
crates/server/src/rpc.rs          |  32 --
3 files changed, 140 insertions(+), 279 deletions(-)

Detailed changes

crates/project/src/lsp_command.rs 🔗

@@ -1,4 +1,4 @@
-use crate::{BufferRequestHandle, DocumentHighlight, Location, Project, ProjectTransaction};
+use crate::{DocumentHighlight, Location, Project, ProjectTransaction};
 use anyhow::{anyhow, Result};
 use async_trait::async_trait;
 use client::{proto, PeerId};
@@ -48,7 +48,6 @@ pub(crate) trait LspCommand: 'static + Sized {
         message: <Self::ProtoRequest as proto::RequestMessage>::Response,
         project: ModelHandle<Project>,
         buffer: ModelHandle<Buffer>,
-        request_handle: BufferRequestHandle,
         cx: AsyncAppContext,
     ) -> Result<Self::Response>;
     fn buffer_id_from_proto(message: &Self::ProtoRequest) -> u64;
@@ -162,7 +161,6 @@ impl LspCommand for PrepareRename {
         message: proto::PrepareRenameResponse,
         _: ModelHandle<Project>,
         buffer: ModelHandle<Buffer>,
-        _: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Option<Range<Anchor>>> {
         if message.can_rename {
@@ -279,7 +277,6 @@ impl LspCommand for PerformRename {
         message: proto::PerformRenameResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
-        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<ProjectTransaction> {
         let message = message
@@ -287,12 +284,7 @@ impl LspCommand for PerformRename {
             .ok_or_else(|| anyhow!("missing transaction"))?;
         project
             .update(&mut cx, |project, cx| {
-                project.deserialize_project_transaction(
-                    message,
-                    self.push_to_history,
-                    request_handle,
-                    cx,
-                )
+                project.deserialize_project_transaction(message, self.push_to_history, cx)
             })
             .await
     }
@@ -435,16 +427,13 @@ impl LspCommand for GetDefinition {
         message: proto::GetDefinitionResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
-        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Vec<Location>> {
         let mut locations = Vec::new();
         for location in message.locations {
             let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
             let buffer = project
-                .update(&mut cx, |this, cx| {
-                    this.deserialize_buffer(buffer, request_handle.clone(), cx)
-                })
+                .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
                 .await?;
             let start = location
                 .start
@@ -586,16 +575,13 @@ impl LspCommand for GetReferences {
         message: proto::GetReferencesResponse,
         project: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
-        request_handle: BufferRequestHandle,
         mut cx: AsyncAppContext,
     ) -> Result<Vec<Location>> {
         let mut locations = Vec::new();
         for location in message.locations {
             let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
             let target_buffer = project
-                .update(&mut cx, |this, cx| {
-                    this.deserialize_buffer(buffer, request_handle.clone(), cx)
-                })
+                .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
                 .await?;
             let start = location
                 .start
@@ -720,7 +706,6 @@ impl LspCommand for GetDocumentHighlights {
         message: proto::GetDocumentHighlightsResponse,
         _: ModelHandle<Project>,
         _: ModelHandle<Buffer>,
-        _: BufferRequestHandle,
         _: AsyncAppContext,
     ) -> Result<Vec<DocumentHighlight>> {
         Ok(message

crates/project/src/project.rs 🔗

@@ -59,24 +59,18 @@ pub struct Project {
     subscriptions: Vec<client::Subscription>,
     language_servers_with_diagnostics_running: isize,
     opened_buffer: (Rc<RefCell<watch::Sender<()>>>, watch::Receiver<()>),
+    shared_buffers: HashMap<PeerId, HashSet<u64>>,
     loading_buffers: HashMap<
         ProjectPath,
         postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
     >,
-    buffers_state: Rc<RefCell<ProjectBuffers>>,
-    shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
+    opened_buffers: HashMap<u64, OpenBuffer>,
     nonce: u128,
 }
 
-#[derive(Default)]
-struct ProjectBuffers {
-    buffer_request_count: usize,
-    preserved_buffers: Vec<ModelHandle<Buffer>>,
-    open_buffers: HashMap<u64, OpenBuffer>,
-}
-
 enum OpenBuffer {
-    Loaded(WeakModelHandle<Buffer>),
+    Strong(ModelHandle<Buffer>),
+    Weak(WeakModelHandle<Buffer>),
     Loading(Vec<Operation>),
 }
 
@@ -155,8 +149,6 @@ pub struct Symbol {
     pub signature: [u8; 32],
 }
 
-pub struct BufferRequestHandle(Rc<RefCell<ProjectBuffers>>);
-
 #[derive(Default)]
 pub struct ProjectTransaction(pub HashMap<ModelHandle<Buffer>, language::Transaction>);
 
@@ -287,9 +279,9 @@ impl Project {
             Self {
                 worktrees: Default::default(),
                 collaborators: Default::default(),
-                buffers_state: Default::default(),
-                loading_buffers: Default::default(),
+                opened_buffers: Default::default(),
                 shared_buffers: Default::default(),
+                loading_buffers: Default::default(),
                 client_state: ProjectClientState::Local {
                     is_shared: false,
                     remote_id_tx,
@@ -359,7 +351,7 @@ impl Project {
                 language_servers_with_diagnostics_running: 0,
                 language_servers: Default::default(),
                 started_language_servers: Default::default(),
-                buffers_state: Default::default(),
+                opened_buffers: Default::default(),
                 nonce: StdRng::from_entropy().gen(),
             };
             for worktree in worktrees {
@@ -399,25 +391,21 @@ impl Project {
     }
 
     #[cfg(any(test, feature = "test-support"))]
-    pub fn shared_buffer(&self, peer_id: PeerId, remote_id: u64) -> Option<ModelHandle<Buffer>> {
-        self.shared_buffers
-            .get(&peer_id)
-            .and_then(|buffers| buffers.get(&remote_id))
-            .cloned()
+    pub fn buffer_for_id(&self, remote_id: u64, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
+        self.opened_buffers
+            .get(&remote_id)
+            .and_then(|buffer| buffer.upgrade(cx))
     }
 
     #[cfg(any(test, feature = "test-support"))]
-    pub fn has_buffered_operations(&self, cx: &AppContext) -> bool {
-        self.buffers_state
-            .borrow()
-            .open_buffers
-            .values()
-            .any(|buffer| match buffer {
-                OpenBuffer::Loaded(buffer) => buffer
-                    .upgrade(cx)
-                    .map_or(false, |buffer| buffer.read(cx).deferred_ops_len() > 0),
-                OpenBuffer::Loading(_) => true,
-            })
+    pub fn has_deferred_operations(&self, cx: &AppContext) -> bool {
+        self.opened_buffers.values().any(|buffer| match buffer {
+            OpenBuffer::Strong(buffer) => buffer.read(cx).deferred_ops_len() > 0,
+            OpenBuffer::Weak(buffer) => buffer
+                .upgrade(cx)
+                .map_or(false, |buffer| buffer.read(cx).deferred_ops_len() > 0),
+            OpenBuffer::Loading(_) => false,
+        })
     }
 
     #[cfg(any(test, feature = "test-support"))]
@@ -518,7 +506,7 @@ impl Project {
     pub fn share(&self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
         let rpc = self.client.clone();
         cx.spawn(|this, mut cx| async move {
-            let project_id = this.update(&mut cx, |this, _| {
+            let project_id = this.update(&mut cx, |this, cx| {
                 if let ProjectClientState::Local {
                     is_shared,
                     remote_id_rx,
@@ -526,6 +514,17 @@ impl Project {
                 } = &mut this.client_state
                 {
                     *is_shared = true;
+                    for open_buffer in this.opened_buffers.values_mut() {
+                        match open_buffer {
+                            OpenBuffer::Strong(_) => {}
+                            OpenBuffer::Weak(buffer) => {
+                                if let Some(buffer) = buffer.upgrade(cx) {
+                                    *open_buffer = OpenBuffer::Strong(buffer);
+                                }
+                            }
+                            OpenBuffer::Loading(_) => unreachable!(),
+                        }
+                    }
                     remote_id_rx
                         .borrow()
                         .ok_or_else(|| anyhow!("no project id"))
@@ -535,6 +534,7 @@ impl Project {
             })?;
 
             rpc.request(proto::ShareProject { project_id }).await?;
+
             let mut tasks = Vec::new();
             this.update(&mut cx, |this, cx| {
                 for worktree in this.worktrees(cx).collect::<Vec<_>>() {
@@ -563,6 +563,15 @@ impl Project {
                 } = &mut this.client_state
                 {
                     *is_shared = false;
+                    for open_buffer in this.opened_buffers.values_mut() {
+                        match open_buffer {
+                            OpenBuffer::Strong(buffer) => {
+                                *open_buffer = OpenBuffer::Weak(buffer.downgrade());
+                            }
+                            OpenBuffer::Weak(_) => {}
+                            OpenBuffer::Loading(_) => unreachable!(),
+                        }
+                    }
                     remote_id_rx
                         .borrow()
                         .ok_or_else(|| anyhow!("no project id"))
@@ -702,7 +711,6 @@ impl Project {
         let remote_worktree_id = worktree.read(cx).id();
         let path = path.clone();
         let path_string = path.to_string_lossy().to_string();
-        let request_handle = self.start_buffer_request(cx);
         cx.spawn(|this, mut cx| async move {
             let response = rpc
                 .request(proto::OpenBuffer {
@@ -712,11 +720,8 @@ impl Project {
                 })
                 .await?;
             let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
-
-            this.update(&mut cx, |this, cx| {
-                this.deserialize_buffer(buffer, request_handle, cx)
-            })
-            .await
+            this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                .await
         })
     }
 
@@ -757,10 +762,6 @@ impl Project {
         })
     }
 
-    fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle {
-        BufferRequestHandle::new(self.buffers_state.clone(), cx)
-    }
-
     pub fn save_buffer_as(
         &self,
         buffer: ModelHandle<Buffer>,
@@ -789,20 +790,16 @@ impl Project {
     pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
         let path = path.into();
         if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
-            self.buffers_state
-                .borrow()
-                .open_buffers
-                .iter()
-                .any(|(_, buffer)| {
-                    if let Some(buffer) = buffer.upgrade(cx) {
-                        if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
-                            if file.worktree == worktree && file.path() == &path.path {
-                                return true;
-                            }
+            self.opened_buffers.iter().any(|(_, buffer)| {
+                if let Some(buffer) = buffer.upgrade(cx) {
+                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+                        if file.worktree == worktree && file.path() == &path.path {
+                            return true;
                         }
                     }
-                    false
-                })
+                }
+                false
+            })
         } else {
             false
         }
@@ -814,19 +811,15 @@ impl Project {
         cx: &mut ModelContext<Self>,
     ) -> Option<ModelHandle<Buffer>> {
         let worktree = self.worktree_for_id(path.worktree_id, cx)?;
-        self.buffers_state
-            .borrow()
-            .open_buffers
-            .values()
-            .find_map(|buffer| {
-                let buffer = buffer.upgrade(cx)?;
-                let file = File::from_dyn(buffer.read(cx).file())?;
-                if file.worktree == worktree && file.path() == &path.path {
-                    Some(buffer)
-                } else {
-                    None
-                }
-            })
+        self.opened_buffers.values().find_map(|buffer| {
+            let buffer = buffer.upgrade(cx)?;
+            let file = File::from_dyn(buffer.read(cx).file())?;
+            if file.worktree == worktree && file.path() == &path.path {
+                Some(buffer)
+            } else {
+                None
+            }
+        })
     }
 
     fn register_buffer(
@@ -836,17 +829,18 @@ impl Project {
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         let remote_id = buffer.read(cx).remote_id();
-        match self
-            .buffers_state
-            .borrow_mut()
-            .open_buffers
-            .insert(remote_id, OpenBuffer::Loaded(buffer.downgrade()))
-        {
+        let open_buffer = if self.is_remote() || self.is_shared() {
+            OpenBuffer::Strong(buffer.clone())
+        } else {
+            OpenBuffer::Weak(buffer.downgrade())
+        };
+
+        match self.opened_buffers.insert(remote_id, open_buffer) {
             None => {}
             Some(OpenBuffer::Loading(operations)) => {
                 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
             }
-            Some(OpenBuffer::Loaded(existing_handle)) => {
+            Some(OpenBuffer::Weak(existing_handle)) => {
                 if existing_handle.upgrade(cx).is_some() {
                     Err(anyhow!(
                         "already registered buffer with remote id {}",
@@ -854,6 +848,10 @@ impl Project {
                     ))?
                 }
             }
+            Some(OpenBuffer::Strong(_)) => Err(anyhow!(
+                "already registered buffer with remote id {}",
+                remote_id
+            ))?,
         }
         self.assign_language_to_buffer(&buffer, worktree, cx);
         Ok(())
@@ -1173,7 +1171,7 @@ impl Project {
             path: relative_path.into(),
         };
 
-        for buffer in self.buffers_state.borrow().open_buffers.values() {
+        for buffer in self.opened_buffers.values() {
             if let Some(buffer) = buffer.upgrade(cx) {
                 if buffer
                     .read(cx)
@@ -1236,7 +1234,6 @@ impl Project {
 
         let remote_buffers = self.remote_id().zip(remote_buffers);
         let client = self.client.clone();
-        let request_handle = self.start_buffer_request(cx);
 
         cx.spawn(|this, mut cx| async move {
             let mut project_transaction = ProjectTransaction::default();
@@ -1255,12 +1252,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing transaction"))?;
                 project_transaction = this
                     .update(&mut cx, |this, cx| {
-                        this.deserialize_project_transaction(
-                            response,
-                            push_to_history,
-                            request_handle,
-                            cx,
-                        )
+                        this.deserialize_project_transaction(response, push_to_history, cx)
                     })
                     .await?;
             }
@@ -1477,7 +1469,6 @@ impl Project {
                 cx,
             )
         } else if let Some(project_id) = self.remote_id() {
-            let request_handle = self.start_buffer_request(cx);
             let request = self.client.request(proto::OpenBufferForSymbol {
                 project_id,
                 symbol: Some(serialize_symbol(symbol)),
@@ -1485,10 +1476,8 @@ impl Project {
             cx.spawn(|this, mut cx| async move {
                 let response = request.await?;
                 let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?;
-                this.update(&mut cx, |this, cx| {
-                    this.deserialize_buffer(buffer, request_handle, cx)
-                })
-                .await
+                this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                    .await
             })
         } else {
             Task::ready(Err(anyhow!("project does not have a remote id")))
@@ -1867,7 +1856,6 @@ impl Project {
             })
         } else if let Some(project_id) = self.remote_id() {
             let client = self.client.clone();
-            let request_handle = self.start_buffer_request(cx);
             let request = proto::ApplyCodeAction {
                 project_id,
                 buffer_id: buffer_handle.read(cx).remote_id(),
@@ -1880,12 +1868,7 @@ impl Project {
                     .transaction
                     .ok_or_else(|| anyhow!("missing transaction"))?;
                 this.update(&mut cx, |this, cx| {
-                    this.deserialize_project_transaction(
-                        response,
-                        push_to_history,
-                        request_handle,
-                        cx,
-                    )
+                    this.deserialize_project_transaction(response, push_to_history, cx)
                 })
                 .await
             })
@@ -2150,9 +2133,7 @@ impl Project {
 
             let (buffers_tx, buffers_rx) = smol::channel::bounded(1024);
             let open_buffers = self
-                .buffers_state
-                .borrow()
-                .open_buffers
+                .opened_buffers
                 .values()
                 .filter_map(|b| b.upgrade(cx))
                 .collect::<HashSet<_>>();
@@ -2227,16 +2208,13 @@ impl Project {
             })
         } else if let Some(project_id) = self.remote_id() {
             let request = self.client.request(query.to_proto(project_id));
-            let request_handle = self.start_buffer_request(cx);
             cx.spawn(|this, mut cx| async move {
                 let response = request.await?;
                 let mut result = HashMap::default();
                 for location in response.locations {
                     let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
                     let target_buffer = this
-                        .update(&mut cx, |this, cx| {
-                            this.deserialize_buffer(buffer, request_handle.clone(), cx)
-                        })
+                        .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
                         .await?;
                     let start = location
                         .start
@@ -2284,12 +2262,11 @@ impl Project {
             }
         } else if let Some(project_id) = self.remote_id() {
             let rpc = self.client.clone();
-            let request_handle = self.start_buffer_request(cx);
             let message = request.to_proto(project_id, buffer);
             return cx.spawn(|this, cx| async move {
                 let response = rpc.request(message).await?;
                 request
-                    .response_from_proto(response, this, buffer_handle, request_handle, cx)
+                    .response_from_proto(response, this, buffer_handle, cx)
                     .await
             });
         }
@@ -2417,7 +2394,7 @@ impl Project {
     ) {
         let snapshot = worktree_handle.read(cx).snapshot();
         let mut buffers_to_delete = Vec::new();
-        for (buffer_id, buffer) in &self.buffers_state.borrow().open_buffers {
+        for (buffer_id, buffer) in &self.opened_buffers {
             if let Some(buffer) = buffer.upgrade(cx) {
                 buffer.update(cx, |buffer, cx| {
                     if let Some(old_file) = File::from_dyn(buffer.file()) {
@@ -2474,10 +2451,7 @@ impl Project {
         }
 
         for buffer_id in buffers_to_delete {
-            self.buffers_state
-                .borrow_mut()
-                .open_buffers
-                .remove(&buffer_id);
+            self.opened_buffers.remove(&buffer_id);
         }
     }
 
@@ -2604,8 +2578,7 @@ impl Project {
                 .remove(&peer_id)
                 .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
                 .replica_id;
-            this.shared_buffers.remove(&peer_id);
-            for (_, buffer) in &this.buffers_state.borrow().open_buffers {
+            for (_, buffer) in &this.opened_buffers {
                 if let Some(buffer) = buffer.upgrade(cx) {
                     buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
                 }
@@ -2731,24 +2704,16 @@ impl Project {
                 .into_iter()
                 .map(|op| language::proto::deserialize_operation(op))
                 .collect::<Result<Vec<_>, _>>()?;
-            let is_remote = this.is_remote();
-            let mut buffers_state = this.buffers_state.borrow_mut();
-            let buffer_request_count = buffers_state.buffer_request_count;
-            match buffers_state.open_buffers.entry(buffer_id) {
+            match this.opened_buffers.entry(buffer_id) {
                 hash_map::Entry::Occupied(mut e) => match e.get_mut() {
-                    OpenBuffer::Loaded(buffer) => {
-                        if let Some(buffer) = buffer.upgrade(cx) {
-                            buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
-                        } else if is_remote && buffer_request_count > 0 {
-                            e.insert(OpenBuffer::Loading(ops));
-                        }
+                    OpenBuffer::Strong(buffer) => {
+                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
                     }
                     OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops),
+                    _ => unreachable!(),
                 },
                 hash_map::Entry::Vacant(e) => {
-                    if is_remote && buffer_request_count > 0 {
-                        e.insert(OpenBuffer::Loading(ops));
-                    }
+                    e.insert(OpenBuffer::Loading(ops));
                 }
             }
             Ok(())
@@ -2770,9 +2735,7 @@ impl Project {
                 .ok_or_else(|| anyhow!("no such worktree"))?;
             let file = File::from_proto(file, worktree.clone(), cx)?;
             let buffer = this
-                .buffers_state
-                .borrow_mut()
-                .open_buffers
+                .opened_buffers
                 .get_mut(&buffer_id)
                 .and_then(|b| b.upgrade(cx))
                 .ok_or_else(|| anyhow!("no such buffer"))?;
@@ -2790,15 +2753,14 @@ impl Project {
         mut cx: AsyncAppContext,
     ) -> Result<proto::BufferSaved> {
         let buffer_id = envelope.payload.buffer_id;
-        let sender_id = envelope.original_sender_id()?;
         let requested_version = envelope.payload.version.try_into()?;
 
-        let (project_id, buffer) = this.update(&mut cx, |this, _| {
+        let (project_id, buffer) = this.update(&mut cx, |this, cx| {
             let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?;
             let buffer = this
-                .shared_buffers
-                .get(&sender_id)
-                .and_then(|shared_buffers| shared_buffers.get(&buffer_id).cloned())
+                .opened_buffers
+                .get(&buffer_id)
+                .map(|buffer| buffer.upgrade(cx).unwrap())
                 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
             Ok::<_, anyhow::Error>((project_id, buffer))
         })?;
@@ -2827,16 +2789,12 @@ impl Project {
     ) -> Result<proto::FormatBuffersResponse> {
         let sender_id = envelope.original_sender_id()?;
         let format = this.update(&mut cx, |this, cx| {
-            let shared_buffers = this
-                .shared_buffers
-                .get(&sender_id)
-                .ok_or_else(|| anyhow!("peer has no buffers"))?;
             let mut buffers = HashSet::default();
             for buffer_id in &envelope.payload.buffer_ids {
                 buffers.insert(
-                    shared_buffers
+                    this.opened_buffers
                         .get(buffer_id)
-                        .cloned()
+                        .map(|buffer| buffer.upgrade(cx).unwrap())
                         .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?,
                 );
             }
@@ -2858,17 +2816,16 @@ impl Project {
         _: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<proto::GetCompletionsResponse> {
-        let sender_id = envelope.original_sender_id()?;
         let position = envelope
             .payload
             .position
             .and_then(language::proto::deserialize_anchor)
             .ok_or_else(|| anyhow!("invalid position"))?;
         let version = clock::Global::from(envelope.payload.version);
-        let buffer = this.read_with(&cx, |this, _| {
-            this.shared_buffers
-                .get(&sender_id)
-                .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+        let buffer = this.read_with(&cx, |this, cx| {
+            this.opened_buffers
+                .get(&envelope.payload.buffer_id)
+                .map(|buffer| buffer.upgrade(cx).unwrap())
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
         })?;
         if !buffer
@@ -2897,12 +2854,11 @@ impl Project {
         _: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<proto::ApplyCompletionAdditionalEditsResponse> {
-        let sender_id = envelope.original_sender_id()?;
         let apply_additional_edits = this.update(&mut cx, |this, cx| {
             let buffer = this
-                .shared_buffers
-                .get(&sender_id)
-                .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+                .opened_buffers
+                .get(&envelope.payload.buffer_id)
+                .map(|buffer| buffer.upgrade(cx).unwrap())
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
             let language = buffer.read(cx).language();
             let completion = language::proto::deserialize_completion(
@@ -2931,7 +2887,6 @@ impl Project {
         _: Arc<Client>,
         mut cx: AsyncAppContext,
     ) -> Result<proto::GetCodeActionsResponse> {
-        let sender_id = envelope.original_sender_id()?;
         let start = envelope
             .payload
             .start
@@ -2942,10 +2897,10 @@ impl Project {
             .end
             .and_then(language::proto::deserialize_anchor)
             .ok_or_else(|| anyhow!("invalid end"))?;
-        let buffer = this.update(&mut cx, |this, _| {
-            this.shared_buffers
-                .get(&sender_id)
-                .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+        let buffer = this.update(&mut cx, |this, cx| {
+            this.opened_buffers
+                .get(&envelope.payload.buffer_id)
+                .map(|buffer| buffer.upgrade(cx).unwrap())
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
         })?;
         let version = buffer.read_with(&cx, |buffer, _| buffer.version());
@@ -2981,9 +2936,9 @@ impl Project {
         )?;
         let apply_code_action = this.update(&mut cx, |this, cx| {
             let buffer = this
-                .shared_buffers
-                .get(&sender_id)
-                .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+                .opened_buffers
+                .get(&envelope.payload.buffer_id)
+                .map(|buffer| buffer.upgrade(cx).unwrap())
                 .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
             Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx))
         })?;
@@ -3010,9 +2965,9 @@ impl Project {
         let (request, buffer_version) = this.update(&mut cx, |this, cx| {
             let buffer_id = T::buffer_id_from_proto(&envelope.payload);
             let buffer_handle = this
-                .shared_buffers
-                .get(&sender_id)
-                .and_then(|shared_buffers| shared_buffers.get(&buffer_id).cloned())
+                .opened_buffers
+                .get(&buffer_id)
+                .map(|buffer| buffer.upgrade(cx).unwrap())
                 .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?;
             let buffer = buffer_handle.read(cx);
             let buffer_version = buffer.version();
@@ -3168,16 +3123,13 @@ impl Project {
         &mut self,
         message: proto::ProjectTransaction,
         push_to_history: bool,
-        request_handle: BufferRequestHandle,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ProjectTransaction>> {
         cx.spawn(|this, mut cx| async move {
             let mut project_transaction = ProjectTransaction::default();
             for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) {
                 let buffer = this
-                    .update(&mut cx, |this, cx| {
-                        this.deserialize_buffer(buffer, request_handle.clone(), cx)
-                    })
+                    .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
                     .await?;
                 let transaction = language::proto::deserialize_transaction(transaction)?;
                 project_transaction.0.insert(buffer, transaction);
@@ -3209,15 +3161,13 @@ impl Project {
     ) -> proto::Buffer {
         let buffer_id = buffer.read(cx).remote_id();
         let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
-        match shared_buffers.entry(buffer_id) {
-            hash_map::Entry::Occupied(_) => proto::Buffer {
+        if shared_buffers.insert(buffer_id) {
+            proto::Buffer {
+                variant: Some(proto::buffer::Variant::State(buffer.read(cx).to_proto())),
+            }
+        } else {
+            proto::Buffer {
                 variant: Some(proto::buffer::Variant::Id(buffer_id)),
-            },
-            hash_map::Entry::Vacant(entry) => {
-                entry.insert(buffer.clone());
-                proto::Buffer {
-                    variant: Some(proto::buffer::Variant::State(buffer.read(cx).to_proto())),
-                }
             }
         }
     }
@@ -3225,7 +3175,6 @@ impl Project {
     fn deserialize_buffer(
         &mut self,
         buffer: proto::Buffer,
-        request_handle: BufferRequestHandle,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ModelHandle<Buffer>>> {
         let replica_id = self.replica_id();
@@ -3237,9 +3186,7 @@ impl Project {
                 proto::buffer::Variant::Id(id) => {
                     let buffer = loop {
                         let buffer = this.read_with(&cx, |this, cx| {
-                            this.buffers_state
-                                .borrow()
-                                .open_buffers
+                            this.opened_buffers
                                 .get(&id)
                                 .and_then(|buffer| buffer.upgrade(cx))
                         });
@@ -3275,7 +3222,6 @@ impl Project {
                         Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap()
                     });
 
-                    request_handle.preserve_buffer(buffer.clone());
                     this.update(&mut cx, |this, cx| {
                         this.register_buffer(&buffer, buffer_worktree.as_ref(), cx)
                     })?;
@@ -3317,20 +3263,13 @@ impl Project {
     }
 
     async fn handle_close_buffer(
-        this: ModelHandle<Self>,
-        envelope: TypedEnvelope<proto::CloseBuffer>,
+        _: ModelHandle<Self>,
+        _: TypedEnvelope<proto::CloseBuffer>,
         _: Arc<Client>,
-        mut cx: AsyncAppContext,
+        _: AsyncAppContext,
     ) -> Result<()> {
-        this.update(&mut cx, |this, cx| {
-            if let Some(shared_buffers) =
-                this.shared_buffers.get_mut(&envelope.original_sender_id()?)
-            {
-                shared_buffers.remove(&envelope.payload.buffer_id);
-                cx.notify();
-            }
-            Ok(())
-        })
+        // TODO: use this for following
+        Ok(())
     }
 
     async fn handle_buffer_saved(
@@ -3348,9 +3287,7 @@ impl Project {
 
         this.update(&mut cx, |this, cx| {
             let buffer = this
-                .buffers_state
-                .borrow()
-                .open_buffers
+                .opened_buffers
                 .get(&envelope.payload.buffer_id)
                 .and_then(|buffer| buffer.upgrade(cx));
             if let Some(buffer) = buffer {
@@ -3376,9 +3313,7 @@ impl Project {
             .into();
         this.update(&mut cx, |this, cx| {
             let buffer = this
-                .buffers_state
-                .borrow()
-                .open_buffers
+                .opened_buffers
                 .get(&payload.buffer_id)
                 .and_then(|buffer| buffer.upgrade(cx));
             if let Some(buffer) = buffer {
@@ -3428,48 +3363,6 @@ impl Project {
     }
 }
 
-impl BufferRequestHandle {
-    fn new(state: Rc<RefCell<ProjectBuffers>>, cx: &AppContext) -> Self {
-        {
-            let state = &mut *state.borrow_mut();
-            state.buffer_request_count += 1;
-            if state.buffer_request_count == 1 {
-                state.preserved_buffers.extend(
-                    state
-                        .open_buffers
-                        .values()
-                        .filter_map(|buffer| buffer.upgrade(cx)),
-                )
-            }
-        }
-        Self(state)
-    }
-
-    fn preserve_buffer(&self, buffer: ModelHandle<Buffer>) {
-        self.0.borrow_mut().preserved_buffers.push(buffer);
-    }
-}
-
-impl Clone for BufferRequestHandle {
-    fn clone(&self) -> Self {
-        self.0.borrow_mut().buffer_request_count += 1;
-        Self(self.0.clone())
-    }
-}
-
-impl Drop for BufferRequestHandle {
-    fn drop(&mut self) {
-        let mut state = self.0.borrow_mut();
-        state.buffer_request_count -= 1;
-        if state.buffer_request_count == 0 {
-            state.preserved_buffers.clear();
-            state
-                .open_buffers
-                .retain(|_, buffer| matches!(buffer, OpenBuffer::Loaded(_)))
-        }
-    }
-}
-
 impl WorktreeHandle {
     pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Worktree>> {
         match self {
@@ -3482,7 +3375,8 @@ impl WorktreeHandle {
 impl OpenBuffer {
     pub fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
         match self {
-            OpenBuffer::Loaded(handle) => handle.upgrade(cx),
+            OpenBuffer::Strong(handle) => Some(handle.clone()),
+            OpenBuffer::Weak(handle) => handle.upgrade(cx),
             OpenBuffer::Loading(_) => None,
         }
     }

crates/server/src/rpc.rs 🔗

@@ -1165,14 +1165,6 @@ mod tests {
         //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
         //     .await;
 
-        // Close the buffer as client A, see that the buffer is closed.
-        cx_a.update(move |_| drop(buffer_a));
-        project_a
-            .condition(&cx_a, |project, cx| {
-                !project.has_open_buffer((worktree_id, "b.txt"), cx)
-            })
-            .await;
-
         // Dropping the client B's project removes client B from client A's collaborators.
         cx_b.update(move |_| drop(project_b));
         project_a
@@ -2535,14 +2527,6 @@ mod tests {
             );
         });
         assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
-
-        cx_b.update(|_| {
-            drop(definitions_1);
-            drop(definitions_2);
-        });
-        project_b
-            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
-            .await;
     }
 
     #[gpui::test(iterations = 10)]
@@ -4370,21 +4354,19 @@ mod tests {
                 .unwrap()
                 .read_with(guest_cx, |project, cx| {
                     assert!(
-                        !project.has_buffered_operations(cx),
-                        "guest {} has buffered operations",
+                        !project.has_deferred_operations(cx),
+                        "guest {} has deferred operations",
                         guest_id,
                     );
                 });
 
             for guest_buffer in &guest_client.buffers {
                 let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
-                let host_buffer = host_project.read_with(&host_cx, |project, _| {
-                    project
-                        .shared_buffer(guest_client.peer_id, buffer_id)
-                        .expect(&format!(
-                            "host does not have buffer for guest:{}, peer:{}, id:{}",
-                            guest_id, guest_client.peer_id, buffer_id
-                        ))
+                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
+                    project.buffer_for_id(buffer_id, cx).expect(&format!(
+                        "host does not have buffer for guest:{}, peer:{}, id:{}",
+                        guest_id, guest_client.peer_id, buffer_id
+                    ))
                 });
                 assert_eq!(
                     guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),