Move buffers from worktree to project

Max Brunsfeld and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/diagnostics/src/diagnostics.rs |   13 
crates/language/src/buffer.rs         |    9 
crates/project/src/project.rs         | 1281 +++++++++++++++++++++++++++-
crates/project/src/worktree.rs        |  966 +--------------------
4 files changed, 1,264 insertions(+), 1,005 deletions(-)

Detailed changes

crates/diagnostics/src/diagnostics.rs 🔗

@@ -764,9 +764,9 @@ mod tests {
             )
             .await;
 
-        let (worktree, _) = project
+        let worktree = project
             .update(&mut cx, |project, cx| {
-                project.find_or_create_worktree_for_abs_path("/test", false, cx)
+                project.add_local_worktree("/test", false, cx)
             })
             .await
             .unwrap();
@@ -777,9 +777,8 @@ mod tests {
             worktree
                 .as_local_mut()
                 .unwrap()
-                .update_diagnostic_entries(
+                .update_diagnostics(
                     Arc::from("/test/main.rs".as_ref()),
-                    None,
                     vec![
                         DiagnosticEntry {
                             range: PointUtf16::new(1, 8)..PointUtf16::new(1, 9),
@@ -930,9 +929,8 @@ mod tests {
             worktree
                 .as_local_mut()
                 .unwrap()
-                .update_diagnostic_entries(
+                .update_diagnostics(
                     Arc::from("/test/consts.rs".as_ref()),
-                    None,
                     vec![DiagnosticEntry {
                         range: PointUtf16::new(0, 15)..PointUtf16::new(0, 15),
                         diagnostic: Diagnostic {
@@ -1036,9 +1034,8 @@ mod tests {
             worktree
                 .as_local_mut()
                 .unwrap()
-                .update_diagnostic_entries(
+                .update_diagnostics(
                     Arc::from("/test/consts.rs".as_ref()),
-                    None,
                     vec![
                         DiagnosticEntry {
                             range: PointUtf16::new(0, 15)..PointUtf16::new(0, 15),

crates/language/src/buffer.rs 🔗

@@ -856,7 +856,7 @@ impl Buffer {
         version: Option<i32>,
         mut diagnostics: Vec<DiagnosticEntry<T>>,
         cx: &mut ModelContext<Self>,
-    ) -> Result<Operation>
+    ) -> Result<()>
     where
         T: Copy + Ord + TextDimension + Sub<Output = T> + Clip + ToPoint,
     {
@@ -944,10 +944,13 @@ impl Buffer {
 
         let set = DiagnosticSet::new(sanitized_diagnostics, content);
         self.apply_diagnostic_update(set.clone(), cx);
-        Ok(Operation::UpdateDiagnostics {
+
+        let op = Operation::UpdateDiagnostics {
             diagnostics: set.iter().cloned().collect(),
             lamport_timestamp: self.text.lamport_clock.tick(),
-        })
+        };
+        self.send_operation(op, cx);
+        Ok(())
     }
 
     fn request_autoindent(&mut self, cx: &mut ModelContext<Self>) {

crates/project/src/project.rs 🔗

@@ -13,17 +13,22 @@ use gpui::{
     WeakModelHandle,
 };
 use language::{
-    Bias, Buffer, DiagnosticEntry, File as _, Language, LanguageRegistry, ToOffset, ToPointUtf16,
+    range_from_lsp, Bias, Buffer, Diagnostic, DiagnosticEntry, File as _, Language,
+    LanguageRegistry, Operation, ToOffset, ToPointUtf16,
 };
 use lsp::{DiagnosticSeverity, LanguageServer};
 use postage::{prelude::Stream, watch};
 use smol::block_on;
 use std::{
+    convert::TryInto,
     ops::Range,
     path::{Path, PathBuf},
-    sync::{atomic::AtomicBool, Arc},
+    sync::{
+        atomic::{AtomicBool, Ordering::SeqCst},
+        Arc,
+    },
 };
-use util::{ResultExt, TryFutureExt as _};
+use util::{post_inc, ResultExt, TryFutureExt as _};
 
 pub use fs::*;
 pub use worktree::*;
@@ -40,6 +45,19 @@ pub struct Project {
     collaborators: HashMap<PeerId, Collaborator>,
     subscriptions: Vec<client::Subscription>,
     language_servers_with_diagnostics_running: isize,
+    open_buffers: HashMap<usize, OpenBuffer>,
+    loading_buffers: HashMap<
+        ProjectPath,
+        postage::watch::Receiver<
+            Option<Result<(ModelHandle<Buffer>, Arc<AtomicBool>), Arc<anyhow::Error>>>,
+        >,
+    >,
+    shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
+}
+
+enum OpenBuffer {
+    Operations(Vec<Operation>),
+    Loaded(WeakModelHandle<Buffer>),
 }
 
 enum WorktreeHandle {
@@ -192,6 +210,9 @@ impl Project {
             Self {
                 worktrees: Default::default(),
                 collaborators: Default::default(),
+                open_buffers: Default::default(),
+                loading_buffers: Default::default(),
+                shared_buffers: Default::default(),
                 client_state: ProjectClientState::Local {
                     is_shared: false,
                     remote_id_tx,
@@ -251,6 +272,9 @@ impl Project {
         Ok(cx.add_model(|cx| {
             let mut this = Self {
                 worktrees: Vec::new(),
+                open_buffers: Default::default(),
+                loading_buffers: Default::default(),
+                shared_buffers: Default::default(),
                 active_entry: None,
                 collaborators,
                 languages,
@@ -474,11 +498,59 @@ impl Project {
         let worktree = if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
             worktree
         } else {
-            return cx.spawn(|_, _| async move { Err(anyhow!("no such worktree")) });
+            return cx
+                .foreground()
+                .spawn(async move { Err(anyhow!("no such worktree")) });
+        };
+
+        // If there is already a buffer for the given path, then return it.
+        let existing_buffer = self.get_open_buffer(&path, cx);
+        if let Some(existing_buffer) = existing_buffer {
+            return cx.foreground().spawn(async move { Ok(existing_buffer) });
+        }
+
+        let mut loading_watch = match self.loading_buffers.entry(path.clone()) {
+            // If the given path is already being loaded, then wait for that existing
+            // task to complete and return the same buffer.
+            hash_map::Entry::Occupied(e) => e.get().clone(),
+
+            // Otherwise, record the fact that this path is now being loaded.
+            hash_map::Entry::Vacant(entry) => {
+                let (mut tx, rx) = postage::watch::channel();
+                entry.insert(rx.clone());
+
+                let load_buffer = worktree.update(cx, |worktree, cx| match worktree {
+                    Worktree::Local(worktree) => worktree.open_buffer(&path.path, cx),
+                    Worktree::Remote(worktree) => worktree.open_buffer(&path.path, cx),
+                });
+
+                cx.spawn(move |this, mut cx| async move {
+                    let load_result = load_buffer.await;
+                    *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| {
+                        // Record the fact that the buffer is no longer loading.
+                        this.loading_buffers.remove(&path);
+                        let buffer = load_result.map_err(Arc::new)?;
+                        this.open_buffers
+                            .insert(buffer.id(), OpenBuffer::Loaded(buffer.downgrade()));
+                        Ok((buffer, Arc::new(AtomicBool::new(true))))
+                    }));
+                })
+                .detach();
+                rx
+            }
         };
-        let buffer_task = worktree.update(cx, |worktree, cx| worktree.open_buffer(path.path, cx));
+
         cx.spawn(|this, mut cx| async move {
-            let (buffer, buffer_is_new) = buffer_task.await?;
+            let (buffer, buffer_is_new) = loop {
+                if let Some(result) = loading_watch.borrow().as_ref() {
+                    break match result {
+                        Ok((buf, is_new)) => Ok((buf.clone(), is_new.fetch_and(false, SeqCst))),
+                        Err(error) => Err(anyhow!("{}", error)),
+                    };
+                }
+                loading_watch.recv().await;
+            }?;
+
             if buffer_is_new {
                 this.update(&mut cx, |this, cx| {
                     this.assign_language_to_buffer(worktree, buffer.clone(), cx)
@@ -506,6 +578,8 @@ impl Project {
                 })
                 .await?;
             this.update(&mut cx, |this, cx| {
+                this.open_buffers
+                    .insert(buffer.id(), OpenBuffer::Loaded(buffer.downgrade()));
                 this.assign_language_to_buffer(worktree, buffer, cx)
             });
             Ok(())
@@ -515,8 +589,43 @@ impl Project {
     #[cfg(any(test, feature = "test-support"))]
     pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
         let path = path.into();
-        self.worktree_for_id(path.worktree_id, cx)
-            .map_or(false, |tree| tree.read(cx).has_open_buffer(path.path, cx))
+        if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
+            self.open_buffers.iter().any(|(_, buffer)| {
+                if let Some(buffer) = buffer.upgrade(cx) {
+                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+                        if file.worktree == worktree && file.path() == &path.path {
+                            return true;
+                        }
+                    }
+                }
+                false
+            })
+        } else {
+            false
+        }
+    }
+
+    fn get_open_buffer(
+        &mut self,
+        path: &ProjectPath,
+        cx: &mut ModelContext<Self>,
+    ) -> Option<ModelHandle<Buffer>> {
+        let mut result = None;
+        let worktree = self.worktree_for_id(path.worktree_id, cx)?;
+        self.open_buffers.retain(|_, buffer| {
+            if let OpenBuffer::Loaded(buffer) = buffer {
+                if let Some(buffer) = buffer.upgrade(cx) {
+                    if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+                        if file.worktree == worktree && file.path() == &path.path {
+                            result = Some(buffer);
+                        }
+                    }
+                    return true;
+                }
+            }
+            false
+        });
+        result
     }
 
     fn assign_language_to_buffer(
@@ -670,13 +779,13 @@ impl Project {
         Some(language_server)
     }
 
-    fn update_diagnostics(
+    pub fn update_diagnostics(
         &mut self,
-        diagnostics: lsp::PublishDiagnosticsParams,
+        params: lsp::PublishDiagnosticsParams,
         disk_based_sources: &HashSet<String>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let path = diagnostics
+        let path = params
             .uri
             .to_file_path()
             .map_err(|_| anyhow!("URI is not a file"))?;
@@ -687,13 +796,113 @@ impl Project {
             worktree_id: worktree.read(cx).id(),
             path: relative_path.into(),
         };
+        let mut next_group_id = 0;
+        let mut diagnostics = Vec::default();
+        let mut primary_diagnostic_group_ids = HashMap::default();
+        let mut sources_by_group_id = HashMap::default();
+        let mut supporting_diagnostic_severities = HashMap::default();
+        for diagnostic in &params.diagnostics {
+            let source = diagnostic.source.as_ref();
+            let code = diagnostic.code.as_ref().map(|code| match code {
+                lsp::NumberOrString::Number(code) => code.to_string(),
+                lsp::NumberOrString::String(code) => code.clone(),
+            });
+            let range = range_from_lsp(diagnostic.range);
+            let is_supporting = diagnostic
+                .related_information
+                .as_ref()
+                .map_or(false, |infos| {
+                    infos.iter().any(|info| {
+                        primary_diagnostic_group_ids.contains_key(&(
+                            source,
+                            code.clone(),
+                            range_from_lsp(info.location.range),
+                        ))
+                    })
+                });
+
+            if is_supporting {
+                if let Some(severity) = diagnostic.severity {
+                    supporting_diagnostic_severities
+                        .insert((source, code.clone(), range), severity);
+                }
+            } else {
+                let group_id = post_inc(&mut next_group_id);
+                let is_disk_based =
+                    source.map_or(false, |source| disk_based_sources.contains(source));
+
+                sources_by_group_id.insert(group_id, source);
+                primary_diagnostic_group_ids
+                    .insert((source, code.clone(), range.clone()), group_id);
+
+                diagnostics.push(DiagnosticEntry {
+                    range,
+                    diagnostic: Diagnostic {
+                        code: code.clone(),
+                        severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
+                        message: diagnostic.message.clone(),
+                        group_id,
+                        is_primary: true,
+                        is_valid: true,
+                        is_disk_based,
+                    },
+                });
+                if let Some(infos) = &diagnostic.related_information {
+                    for info in infos {
+                        if info.location.uri == params.uri {
+                            let range = range_from_lsp(info.location.range);
+                            diagnostics.push(DiagnosticEntry {
+                                range,
+                                diagnostic: Diagnostic {
+                                    code: code.clone(),
+                                    severity: DiagnosticSeverity::INFORMATION,
+                                    message: info.message.clone(),
+                                    group_id,
+                                    is_primary: false,
+                                    is_valid: true,
+                                    is_disk_based,
+                                },
+                            });
+                        }
+                    }
+                }
+            }
+        }
+
+        for entry in &mut diagnostics {
+            let diagnostic = &mut entry.diagnostic;
+            if !diagnostic.is_primary {
+                let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
+                if let Some(&severity) = supporting_diagnostic_severities.get(&(
+                    source,
+                    diagnostic.code.clone(),
+                    entry.range.clone(),
+                )) {
+                    diagnostic.severity = severity;
+                }
+            }
+        }
+
+        for buffer in self.open_buffers.values() {
+            if let Some(buffer) = buffer.upgrade(cx) {
+                if buffer
+                    .read(cx)
+                    .file()
+                    .map_or(false, |file| *file.path() == project_path.path)
+                {
+                    buffer.update(cx, |buffer, cx| {
+                        buffer.update_diagnostics(params.version, diagnostics.clone(), cx)
+                    })?;
+                    break;
+                }
+            }
+        }
+
         worktree.update(cx, |worktree, cx| {
-            worktree.as_local_mut().unwrap().update_diagnostics(
-                project_path.path.clone(),
-                diagnostics,
-                disk_based_sources,
-                cx,
-            )
+            worktree
+                .as_local_mut()
+                .ok_or_else(|| anyhow!("not a local worktree"))?
+                .update_diagnostics(project_path.path.clone(), diagnostics, cx)
         })?;
         cx.emit(Event::DiagnosticsUpdated(project_path));
         Ok(())
@@ -874,7 +1083,7 @@ impl Project {
         }
     }
 
-    fn add_local_worktree(
+    pub fn add_local_worktree(
         &self,
         abs_path: impl AsRef<Path>,
         weak: bool,
@@ -884,7 +1093,7 @@ impl Project {
         let client = self.client.clone();
         let path = Arc::from(abs_path.as_ref());
         cx.spawn(|project, mut cx| async move {
-            let worktree = Worktree::open_local(client.clone(), path, weak, fs, &mut cx).await?;
+            let worktree = Worktree::local(client.clone(), path, weak, fs, &mut cx).await?;
 
             let (remote_project_id, is_shared) = project.update(&mut cx, |project, cx| {
                 project.add_worktree(&worktree, cx);
@@ -921,6 +1130,10 @@ impl Project {
 
     fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
         cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
+        cx.subscribe(&worktree, |this, worktree, _, cx| {
+            this.update_open_buffers(worktree, cx)
+        })
+        .detach();
 
         let push_weak_handle = {
             let worktree = worktree.read(cx);
@@ -942,6 +1155,74 @@ impl Project {
         cx.notify();
     }
 
+    fn update_open_buffers(
+        &mut self,
+        worktree_handle: ModelHandle<Worktree>,
+        cx: &mut ModelContext<Self>,
+    ) {
+        let local = worktree_handle.read(cx).is_local();
+        let snapshot = worktree_handle.read(cx).snapshot();
+        let worktree_path = snapshot.abs_path();
+        let mut buffers_to_delete = Vec::new();
+        for (buffer_id, buffer) in &self.open_buffers {
+            if let OpenBuffer::Loaded(buffer) = buffer {
+                if let Some(buffer) = buffer.upgrade(cx) {
+                    buffer.update(cx, |buffer, cx| {
+                        if let Some(old_file) = File::from_dyn(buffer.file()) {
+                            if old_file.worktree != worktree_handle {
+                                return;
+                            }
+
+                            let new_file = if let Some(entry) = old_file
+                                .entry_id
+                                .and_then(|entry_id| snapshot.entry_for_id(entry_id))
+                            {
+                                File {
+                                    is_local: local,
+                                    worktree_path: worktree_path.clone(),
+                                    entry_id: Some(entry.id),
+                                    mtime: entry.mtime,
+                                    path: entry.path.clone(),
+                                    worktree: worktree_handle.clone(),
+                                }
+                            } else if let Some(entry) =
+                                snapshot.entry_for_path(old_file.path().as_ref())
+                            {
+                                File {
+                                    is_local: local,
+                                    worktree_path: worktree_path.clone(),
+                                    entry_id: Some(entry.id),
+                                    mtime: entry.mtime,
+                                    path: entry.path.clone(),
+                                    worktree: worktree_handle.clone(),
+                                }
+                            } else {
+                                File {
+                                    is_local: local,
+                                    worktree_path: worktree_path.clone(),
+                                    entry_id: None,
+                                    path: old_file.path().clone(),
+                                    mtime: old_file.mtime(),
+                                    worktree: worktree_handle.clone(),
+                                }
+                            };
+
+                            if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
+                                task.detach();
+                            }
+                        }
+                    });
+                } else {
+                    buffers_to_delete.push(*buffer_id);
+                }
+            }
+        }
+
+        for buffer_id in buffers_to_delete {
+            self.open_buffers.remove(&buffer_id);
+        }
+    }
+
     pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
         let new_active_entry = entry.and_then(|project_path| {
             let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
@@ -1069,11 +1350,15 @@ impl Project {
             .remove(&peer_id)
             .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
             .replica_id;
-        for worktree in self.worktrees(cx).collect::<Vec<_>>() {
-            worktree.update(cx, |worktree, cx| {
-                worktree.remove_collaborator(peer_id, replica_id, cx);
-            })
+        self.shared_buffers.remove(&peer_id);
+        for (_, buffer) in &self.open_buffers {
+            if let OpenBuffer::Loaded(buffer) = buffer {
+                if let Some(buffer) = buffer.upgrade(cx) {
+                    buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
+                }
+            }
         }
+        cx.notify();
         Ok(())
     }
 
@@ -1180,11 +1465,27 @@ impl Project {
         _: Arc<Client>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree.handle_update_buffer(envelope, cx)
-            })?;
+        let payload = envelope.payload.clone();
+        let buffer_id = payload.buffer_id as usize;
+        let ops = payload
+            .operations
+            .into_iter()
+            .map(|op| language::proto::deserialize_operation(op))
+            .collect::<Result<Vec<_>, _>>()?;
+        match self.open_buffers.get_mut(&buffer_id) {
+            Some(OpenBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
+            Some(OpenBuffer::Loaded(buffer)) => {
+                if let Some(buffer) = buffer.upgrade(cx) {
+                    buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+                } else {
+                    self.open_buffers
+                        .insert(buffer_id, OpenBuffer::Operations(ops));
+                }
+            }
+            None => {
+                self.open_buffers
+                    .insert(buffer_id, OpenBuffer::Operations(ops));
+            }
         }
         Ok(())
     }
@@ -1195,12 +1496,42 @@ impl Project {
         rpc: Arc<Client>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree.handle_save_buffer(envelope, rpc, cx)
-            })?;
-        }
+        let sender_id = envelope.original_sender_id()?;
+        let project_id = self.remote_id().ok_or_else(|| anyhow!("not connected"))?;
+        let buffer = self
+            .shared_buffers
+            .get(&sender_id)
+            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
+        let receipt = envelope.receipt();
+        let worktree_id = envelope.payload.worktree_id;
+        let buffer_id = envelope.payload.buffer_id;
+        let save = cx.spawn(|_, mut cx| async move {
+            buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await
+        });
+
+        cx.background()
+            .spawn(
+                async move {
+                    let (version, mtime) = save.await?;
+
+                    rpc.respond(
+                        receipt,
+                        proto::BufferSaved {
+                            project_id,
+                            worktree_id,
+                            buffer_id,
+                            version: (&version).into(),
+                            mtime: Some(mtime.into()),
+                        },
+                    )
+                    .await?;
+
+                    Ok(())
+                }
+                .log_err(),
+            )
+            .detach();
         Ok(())
     }
 
@@ -1210,12 +1541,36 @@ impl Project {
         rpc: Arc<Client>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree.handle_format_buffer(envelope, rpc, cx)
-            })?;
-        }
+        let receipt = envelope.receipt();
+        let sender_id = envelope.original_sender_id()?;
+        let buffer = self
+            .shared_buffers
+            .get(&sender_id)
+            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
+        cx.spawn(|_, mut cx| async move {
+            let format = buffer.update(&mut cx, |buffer, cx| buffer.format(cx)).await;
+            // We spawn here in order to enqueue the sending of `Ack` *after* transmission of edits
+            // associated with formatting.
+            cx.spawn(|_| async move {
+                match format {
+                    Ok(()) => rpc.respond(receipt, proto::Ack {}).await?,
+                    Err(error) => {
+                        rpc.respond_with_error(
+                            receipt,
+                            proto::Error {
+                                message: error.to_string(),
+                            },
+                        )
+                        .await?
+                    }
+                }
+                Ok::<_, anyhow::Error>(())
+            })
+            .await
+            .log_err();
+        })
+        .detach();
         Ok(())
     }
 
@@ -1228,28 +1583,30 @@ impl Project {
         let receipt = envelope.receipt();
         let peer_id = envelope.original_sender_id()?;
         let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        let worktree = self
-            .worktree_for_id(worktree_id, cx)
-            .ok_or_else(|| anyhow!("no such worktree"))?;
-
-        let task = self.open_buffer(
+        let open_buffer = self.open_buffer(
             ProjectPath {
                 worktree_id,
                 path: PathBuf::from(envelope.payload.path).into(),
             },
             cx,
         );
-        cx.spawn(|_, mut cx| {
+        cx.spawn(|this, mut cx| {
             async move {
-                let buffer = task.await?;
-                let response = worktree.update(&mut cx, |worktree, cx| {
-                    worktree
-                        .as_local_mut()
-                        .unwrap()
-                        .open_remote_buffer(peer_id, buffer, cx)
+                let buffer = open_buffer.await?;
+                this.update(&mut cx, |this, _| {
+                    this.shared_buffers
+                        .entry(peer_id)
+                        .or_default()
+                        .insert(buffer.id() as u64, buffer.clone());
                 });
-                rpc.respond(receipt, response).await?;
-                Ok(())
+                let message = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
+                rpc.respond(
+                    receipt,
+                    proto::OpenBufferResponse {
+                        buffer: Some(message),
+                    },
+                )
+                .await
             }
             .log_err()
         })
@@ -1263,14 +1620,9 @@ impl Project {
         _: Arc<Client>,
         cx: &mut ModelContext<Self>,
     ) -> anyhow::Result<()> {
-        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree
-                    .as_local_mut()
-                    .unwrap()
-                    .close_remote_buffer(envelope, cx)
-            })?;
+        if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
+            shared_buffers.remove(&envelope.payload.buffer_id);
+            cx.notify();
         }
         Ok(())
     }
@@ -1281,10 +1633,26 @@ impl Project {
         _: Arc<Client>,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
-        let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
-        if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
-            worktree.update(cx, |worktree, cx| {
-                worktree.handle_buffer_saved(envelope, cx)
+        let payload = envelope.payload.clone();
+        let buffer = self
+            .open_buffers
+            .get(&(payload.buffer_id as usize))
+            .and_then(|buf| {
+                if let OpenBuffer::Loaded(buffer) = buf {
+                    buffer.upgrade(cx)
+                } else {
+                    None
+                }
+            });
+        if let Some(buffer) = buffer {
+            buffer.update(cx, |buffer, cx| {
+                let version = payload.version.try_into()?;
+                let mtime = payload
+                    .mtime
+                    .ok_or_else(|| anyhow!("missing mtime"))?
+                    .into();
+                buffer.did_save(version, mtime, None, cx);
+                Result::<_, anyhow::Error>::Ok(())
             })?;
         }
         Ok(())
@@ -1474,6 +1842,15 @@ impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
     }
 }
 
+impl OpenBuffer {
+    fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
+        match self {
+            OpenBuffer::Loaded(buffer) => buffer.upgrade(cx),
+            OpenBuffer::Operations(_) => None,
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::{Event, *};
@@ -1487,8 +1864,10 @@ mod tests {
     };
     use lsp::Url;
     use serde_json::json;
-    use std::{os::unix, path::PathBuf};
+    use std::{cell::RefCell, os::unix, path::PathBuf, rc::Rc};
+    use unindent::Unindent as _;
     use util::test::temp_tree;
+    use worktree::WorktreeHandle as _;
 
     #[gpui::test]
     async fn test_populate_and_search(mut cx: gpui::TestAppContext) {
@@ -1515,7 +1894,7 @@ mod tests {
         )
         .unwrap();
 
-        let project = build_project(&mut cx);
+        let project = build_project(Arc::new(RealFs), &mut cx);
 
         let (tree, _) = project
             .update(&mut cx, |project, cx| {
@@ -1660,8 +2039,8 @@ mod tests {
             Event::DiskBasedDiagnosticsFinished
         );
 
-        let (buffer, _) = tree
-            .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
+        let buffer = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
             .await
             .unwrap();
 
@@ -1697,7 +2076,7 @@ mod tests {
             }
         }));
 
-        let project = build_project(&mut cx);
+        let project = build_project(Arc::new(RealFs), &mut cx);
         let (tree, _) = project
             .update(&mut cx, |project, cx| {
                 project.find_or_create_worktree_for_abs_path(&dir.path(), false, cx)
@@ -1838,9 +2217,765 @@ mod tests {
         }
     }
 
-    fn build_project(cx: &mut TestAppContext) -> ModelHandle<Project> {
+    #[gpui::test]
+    async fn test_save_file(mut cx: gpui::TestAppContext) {
+        let fs = Arc::new(FakeFs::new());
+        fs.insert_tree(
+            "/dir",
+            json!({
+                "file1": "the old contents",
+            }),
+        )
+        .await;
+
+        let project = build_project(fs.clone(), &mut cx);
+        let worktree_id = project
+            .update(&mut cx, |p, cx| p.add_local_worktree("/dir", false, cx))
+            .await
+            .unwrap()
+            .read_with(&cx, |tree, _| tree.id());
+
+        let buffer = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
+            .await
+            .unwrap();
+        buffer
+            .update(&mut cx, |buffer, cx| {
+                assert_eq!(buffer.text(), "the old contents");
+                buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
+                buffer.save(cx)
+            })
+            .await
+            .unwrap();
+
+        let new_text = fs.load(Path::new("/dir/file1")).await.unwrap();
+        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
+    }
+
+    #[gpui::test]
+    async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
+        let fs = Arc::new(FakeFs::new());
+        fs.insert_tree(
+            "/dir",
+            json!({
+                "file1": "the old contents",
+            }),
+        )
+        .await;
+
+        let project = build_project(fs.clone(), &mut cx);
+        let worktree_id = project
+            .update(&mut cx, |p, cx| {
+                p.add_local_worktree("/dir/file1", false, cx)
+            })
+            .await
+            .unwrap()
+            .read_with(&cx, |tree, _| tree.id());
+
+        let buffer = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, ""), cx))
+            .await
+            .unwrap();
+        buffer
+            .update(&mut cx, |buffer, cx| {
+                buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
+                buffer.save(cx)
+            })
+            .await
+            .unwrap();
+
+        let new_text = fs.load(Path::new("/dir/file1")).await.unwrap();
+        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
+    }
+
+    #[gpui::test]
+    async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
+        let dir = temp_tree(json!({
+            "a": {
+                "file1": "",
+                "file2": "",
+                "file3": "",
+            },
+            "b": {
+                "c": {
+                    "file4": "",
+                    "file5": "",
+                }
+            }
+        }));
+
+        let project = build_project(Arc::new(RealFs), &mut cx);
+        let rpc = project.read_with(&cx, |p, _| p.client.clone());
+
+        let tree = project
+            .update(&mut cx, |p, cx| p.add_local_worktree(dir.path(), false, cx))
+            .await
+            .unwrap();
+        let worktree_id = tree.read_with(&cx, |tree, _| tree.id());
+
+        let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
+            let buffer = project.update(cx, |p, cx| p.open_buffer((worktree_id, path), cx));
+            async move { buffer.await.unwrap() }
+        };
+        let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
+            tree.read_with(cx, |tree, _| {
+                tree.entry_for_path(path)
+                    .expect(&format!("no entry for path {}", path))
+                    .id
+            })
+        };
+
+        let buffer2 = buffer_for_path("a/file2", &mut cx).await;
+        let buffer3 = buffer_for_path("a/file3", &mut cx).await;
+        let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
+        let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
+
+        let file2_id = id_for_path("a/file2", &cx);
+        let file3_id = id_for_path("a/file3", &cx);
+        let file4_id = id_for_path("b/c/file4", &cx);
+
+        // Wait for the initial scan.
+        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
+            .await;
+
+        // Create a remote copy of this worktree.
+        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
+        let remote = Worktree::remote(
+            1,
+            1,
+            initial_snapshot.to_proto(&Default::default(), Default::default()),
+            rpc.clone(),
+            &mut cx.to_async(),
+        )
+        .await
+        .unwrap();
+
+        cx.read(|cx| {
+            assert!(!buffer2.read(cx).is_dirty());
+            assert!(!buffer3.read(cx).is_dirty());
+            assert!(!buffer4.read(cx).is_dirty());
+            assert!(!buffer5.read(cx).is_dirty());
+        });
+
+        // Rename and delete files and directories.
+        tree.flush_fs_events(&cx).await;
+        std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
+        std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
+        std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
+        std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
+        tree.flush_fs_events(&cx).await;
+
+        let expected_paths = vec![
+            "a",
+            "a/file1",
+            "a/file2.new",
+            "b",
+            "d",
+            "d/file3",
+            "d/file4",
+        ];
+
+        cx.read(|app| {
+            assert_eq!(
+                tree.read(app)
+                    .paths()
+                    .map(|p| p.to_str().unwrap())
+                    .collect::<Vec<_>>(),
+                expected_paths
+            );
+
+            assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
+            assert_eq!(id_for_path("d/file3", &cx), file3_id);
+            assert_eq!(id_for_path("d/file4", &cx), file4_id);
+
+            assert_eq!(
+                buffer2.read(app).file().unwrap().path().as_ref(),
+                Path::new("a/file2.new")
+            );
+            assert_eq!(
+                buffer3.read(app).file().unwrap().path().as_ref(),
+                Path::new("d/file3")
+            );
+            assert_eq!(
+                buffer4.read(app).file().unwrap().path().as_ref(),
+                Path::new("d/file4")
+            );
+            assert_eq!(
+                buffer5.read(app).file().unwrap().path().as_ref(),
+                Path::new("b/c/file5")
+            );
+
+            assert!(!buffer2.read(app).file().unwrap().is_deleted());
+            assert!(!buffer3.read(app).file().unwrap().is_deleted());
+            assert!(!buffer4.read(app).file().unwrap().is_deleted());
+            assert!(buffer5.read(app).file().unwrap().is_deleted());
+        });
+
+        // Update the remote worktree. Check that it becomes consistent with the
+        // local worktree.
+        remote.update(&mut cx, |remote, cx| {
+            let update_message =
+                tree.read(cx)
+                    .snapshot()
+                    .build_update(&initial_snapshot, 1, 1, true);
+            remote
+                .as_remote_mut()
+                .unwrap()
+                .snapshot
+                .apply_update(update_message)
+                .unwrap();
+
+            assert_eq!(
+                remote
+                    .paths()
+                    .map(|p| p.to_str().unwrap())
+                    .collect::<Vec<_>>(),
+                expected_paths
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_buffer_deduping(mut cx: gpui::TestAppContext) {
+        let fs = Arc::new(FakeFs::new());
+        fs.insert_tree(
+            "/the-dir",
+            json!({
+                "a.txt": "a-contents",
+                "b.txt": "b-contents",
+            }),
+        )
+        .await;
+
+        let project = build_project(fs.clone(), &mut cx);
+        let worktree_id = project
+            .update(&mut cx, |p, cx| p.add_local_worktree("/the-dir", false, cx))
+            .await
+            .unwrap()
+            .read_with(&cx, |tree, _| tree.id());
+
+        // Spawn multiple tasks to open paths, repeating some paths.
+        let (buffer_a_1, buffer_b, buffer_a_2) = project.update(&mut cx, |p, cx| {
+            (
+                p.open_buffer((worktree_id, "a.txt"), cx),
+                p.open_buffer((worktree_id, "b.txt"), cx),
+                p.open_buffer((worktree_id, "a.txt"), cx),
+            )
+        });
+
+        let buffer_a_1 = buffer_a_1.await.unwrap();
+        let buffer_a_2 = buffer_a_2.await.unwrap();
+        let buffer_b = buffer_b.await.unwrap();
+        assert_eq!(buffer_a_1.read_with(&cx, |b, _| b.text()), "a-contents");
+        assert_eq!(buffer_b.read_with(&cx, |b, _| b.text()), "b-contents");
+
+        // There is only one buffer per path.
+        let buffer_a_id = buffer_a_1.id();
+        assert_eq!(buffer_a_2.id(), buffer_a_id);
+
+        // Open the same path again while it is still open.
+        drop(buffer_a_1);
+        let buffer_a_3 = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
+            .await
+            .unwrap();
+
+        // There's still only one buffer per path.
+        assert_eq!(buffer_a_3.id(), buffer_a_id);
+    }
+
+    #[gpui::test]
+    async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) {
+        use std::fs;
+
+        let dir = temp_tree(json!({
+            "file1": "abc",
+            "file2": "def",
+            "file3": "ghi",
+        }));
+
+        let project = build_project(Arc::new(RealFs), &mut cx);
+        let worktree = project
+            .update(&mut cx, |p, cx| p.add_local_worktree(dir.path(), false, cx))
+            .await
+            .unwrap();
+        let worktree_id = worktree.read_with(&cx, |worktree, _| worktree.id());
+
+        worktree.flush_fs_events(&cx).await;
+        worktree
+            .read_with(&cx, |t, _| t.as_local().unwrap().scan_complete())
+            .await;
+
+        let buffer1 = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
+            .await
+            .unwrap();
+        let events = Rc::new(RefCell::new(Vec::new()));
+
+        // initially, the buffer isn't dirty.
+        buffer1.update(&mut cx, |buffer, cx| {
+            cx.subscribe(&buffer1, {
+                let events = events.clone();
+                move |_, _, event, _| events.borrow_mut().push(event.clone())
+            })
+            .detach();
+
+            assert!(!buffer.is_dirty());
+            assert!(events.borrow().is_empty());
+
+            buffer.edit(vec![1..2], "", cx);
+        });
+
+        // after the first edit, the buffer is dirty, and emits a dirtied event.
+        buffer1.update(&mut cx, |buffer, cx| {
+            assert!(buffer.text() == "ac");
+            assert!(buffer.is_dirty());
+            assert_eq!(
+                *events.borrow(),
+                &[language::Event::Edited, language::Event::Dirtied]
+            );
+            events.borrow_mut().clear();
+            buffer.did_save(buffer.version(), buffer.file().unwrap().mtime(), None, cx);
+        });
+
+        // after saving, the buffer is not dirty, and emits a saved event.
+        buffer1.update(&mut cx, |buffer, cx| {
+            assert!(!buffer.is_dirty());
+            assert_eq!(*events.borrow(), &[language::Event::Saved]);
+            events.borrow_mut().clear();
+
+            buffer.edit(vec![1..1], "B", cx);
+            buffer.edit(vec![2..2], "D", cx);
+        });
+
+        // after editing again, the buffer is dirty, and emits another dirty event.
+        buffer1.update(&mut cx, |buffer, cx| {
+            assert!(buffer.text() == "aBDc");
+            assert!(buffer.is_dirty());
+            assert_eq!(
+                *events.borrow(),
+                &[
+                    language::Event::Edited,
+                    language::Event::Dirtied,
+                    language::Event::Edited,
+                ],
+            );
+            events.borrow_mut().clear();
+
+            // TODO - currently, after restoring the buffer to its
+            // previously-saved state, the is still considered dirty.
+            buffer.edit([1..3], "", cx);
+            assert!(buffer.text() == "ac");
+            assert!(buffer.is_dirty());
+        });
+
+        assert_eq!(*events.borrow(), &[language::Event::Edited]);
+
+        // When a file is deleted, the buffer is considered dirty.
+        let events = Rc::new(RefCell::new(Vec::new()));
+        let buffer2 = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "file2"), cx))
+            .await
+            .unwrap();
+        buffer2.update(&mut cx, |_, cx| {
+            cx.subscribe(&buffer2, {
+                let events = events.clone();
+                move |_, _, event, _| events.borrow_mut().push(event.clone())
+            })
+            .detach();
+        });
+
+        fs::remove_file(dir.path().join("file2")).unwrap();
+        buffer2.condition(&cx, |b, _| b.is_dirty()).await;
+        assert_eq!(
+            *events.borrow(),
+            &[language::Event::Dirtied, language::Event::FileHandleChanged]
+        );
+
+        // When a file is already dirty when deleted, we don't emit a Dirtied event.
+        let events = Rc::new(RefCell::new(Vec::new()));
+        let buffer3 = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "file3"), cx))
+            .await
+            .unwrap();
+        buffer3.update(&mut cx, |_, cx| {
+            cx.subscribe(&buffer3, {
+                let events = events.clone();
+                move |_, _, event, _| events.borrow_mut().push(event.clone())
+            })
+            .detach();
+        });
+
+        worktree.flush_fs_events(&cx).await;
+        buffer3.update(&mut cx, |buffer, cx| {
+            buffer.edit(Some(0..0), "x", cx);
+        });
+        events.borrow_mut().clear();
+        fs::remove_file(dir.path().join("file3")).unwrap();
+        buffer3
+            .condition(&cx, |_, _| !events.borrow().is_empty())
+            .await;
+        assert_eq!(*events.borrow(), &[language::Event::FileHandleChanged]);
+        cx.read(|cx| assert!(buffer3.read(cx).is_dirty()));
+    }
+
+    #[gpui::test]
+    async fn test_buffer_file_changes_on_disk(mut cx: gpui::TestAppContext) {
+        use std::fs;
+
+        let initial_contents = "aaa\nbbbbb\nc\n";
+        let dir = temp_tree(json!({ "the-file": initial_contents }));
+
+        let project = build_project(Arc::new(RealFs), &mut cx);
+        let worktree = project
+            .update(&mut cx, |p, cx| p.add_local_worktree(dir.path(), false, cx))
+            .await
+            .unwrap();
+        let worktree_id = worktree.read_with(&cx, |tree, _| tree.id());
+
+        worktree
+            .read_with(&cx, |t, _| t.as_local().unwrap().scan_complete())
+            .await;
+
+        let abs_path = dir.path().join("the-file");
+        let buffer = project
+            .update(&mut cx, |p, cx| {
+                p.open_buffer((worktree_id, "the-file"), cx)
+            })
+            .await
+            .unwrap();
+
+        // TODO
+        // Add a cursor on each row.
+        // let selection_set_id = buffer.update(&mut cx, |buffer, cx| {
+        //     assert!(!buffer.is_dirty());
+        //     buffer.add_selection_set(
+        //         &(0..3)
+        //             .map(|row| Selection {
+        //                 id: row as usize,
+        //                 start: Point::new(row, 1),
+        //                 end: Point::new(row, 1),
+        //                 reversed: false,
+        //                 goal: SelectionGoal::None,
+        //             })
+        //             .collect::<Vec<_>>(),
+        //         cx,
+        //     )
+        // });
+
+        // Change the file on disk, adding two new lines of text, and removing
+        // one line.
+        buffer.read_with(&cx, |buffer, _| {
+            assert!(!buffer.is_dirty());
+            assert!(!buffer.has_conflict());
+        });
+        let new_contents = "AAAA\naaa\nBB\nbbbbb\n";
+        fs::write(&abs_path, new_contents).unwrap();
+
+        // Because the buffer was not modified, it is reloaded from disk. Its
+        // contents are edited according to the diff between the old and new
+        // file contents.
+        buffer
+            .condition(&cx, |buffer, _| buffer.text() == new_contents)
+            .await;
+
+        buffer.update(&mut cx, |buffer, _| {
+            assert_eq!(buffer.text(), new_contents);
+            assert!(!buffer.is_dirty());
+            assert!(!buffer.has_conflict());
+
+            // TODO
+            // let cursor_positions = buffer
+            //     .selection_set(selection_set_id)
+            //     .unwrap()
+            //     .selections::<Point>(&*buffer)
+            //     .map(|selection| {
+            //         assert_eq!(selection.start, selection.end);
+            //         selection.start
+            //     })
+            //     .collect::<Vec<_>>();
+            // assert_eq!(
+            //     cursor_positions,
+            //     [Point::new(1, 1), Point::new(3, 1), Point::new(4, 0)]
+            // );
+        });
+
+        // Modify the buffer
+        buffer.update(&mut cx, |buffer, cx| {
+            buffer.edit(vec![0..0], " ", cx);
+            assert!(buffer.is_dirty());
+            assert!(!buffer.has_conflict());
+        });
+
+        // Change the file on disk again, adding blank lines to the beginning.
+        fs::write(&abs_path, "\n\n\nAAAA\naaa\nBB\nbbbbb\n").unwrap();
+
+        // Because the buffer is modified, it doesn't reload from disk, but is
+        // marked as having a conflict.
+        buffer
+            .condition(&cx, |buffer, _| buffer.has_conflict())
+            .await;
+    }
+
+    #[gpui::test]
+    async fn test_grouped_diagnostics(mut cx: gpui::TestAppContext) {
+        let fs = Arc::new(FakeFs::new());
+        fs.insert_tree(
+            "/the-dir",
+            json!({
+                "a.rs": "
+                    fn foo(mut v: Vec<usize>) {
+                        for x in &v {
+                            v.push(1);
+                        }
+                    }
+                "
+                .unindent(),
+            }),
+        )
+        .await;
+
+        let project = build_project(fs.clone(), &mut cx);
+        let worktree = project
+            .update(&mut cx, |p, cx| p.add_local_worktree("/the-dir", false, cx))
+            .await
+            .unwrap();
+        let worktree_id = worktree.read_with(&cx, |tree, _| tree.id());
+
+        let buffer = project
+            .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
+            .await
+            .unwrap();
+
+        let buffer_uri = Url::from_file_path("/the-dir/a.rs").unwrap();
+        let message = lsp::PublishDiagnosticsParams {
+            uri: buffer_uri.clone(),
+            diagnostics: vec![
+                lsp::Diagnostic {
+                    range: lsp::Range::new(lsp::Position::new(1, 8), lsp::Position::new(1, 9)),
+                    severity: Some(DiagnosticSeverity::WARNING),
+                    message: "error 1".to_string(),
+                    related_information: Some(vec![lsp::DiagnosticRelatedInformation {
+                        location: lsp::Location {
+                            uri: buffer_uri.clone(),
+                            range: lsp::Range::new(
+                                lsp::Position::new(1, 8),
+                                lsp::Position::new(1, 9),
+                            ),
+                        },
+                        message: "error 1 hint 1".to_string(),
+                    }]),
+                    ..Default::default()
+                },
+                lsp::Diagnostic {
+                    range: lsp::Range::new(lsp::Position::new(1, 8), lsp::Position::new(1, 9)),
+                    severity: Some(DiagnosticSeverity::HINT),
+                    message: "error 1 hint 1".to_string(),
+                    related_information: Some(vec![lsp::DiagnosticRelatedInformation {
+                        location: lsp::Location {
+                            uri: buffer_uri.clone(),
+                            range: lsp::Range::new(
+                                lsp::Position::new(1, 8),
+                                lsp::Position::new(1, 9),
+                            ),
+                        },
+                        message: "original diagnostic".to_string(),
+                    }]),
+                    ..Default::default()
+                },
+                lsp::Diagnostic {
+                    range: lsp::Range::new(lsp::Position::new(2, 8), lsp::Position::new(2, 17)),
+                    severity: Some(DiagnosticSeverity::ERROR),
+                    message: "error 2".to_string(),
+                    related_information: Some(vec![
+                        lsp::DiagnosticRelatedInformation {
+                            location: lsp::Location {
+                                uri: buffer_uri.clone(),
+                                range: lsp::Range::new(
+                                    lsp::Position::new(1, 13),
+                                    lsp::Position::new(1, 15),
+                                ),
+                            },
+                            message: "error 2 hint 1".to_string(),
+                        },
+                        lsp::DiagnosticRelatedInformation {
+                            location: lsp::Location {
+                                uri: buffer_uri.clone(),
+                                range: lsp::Range::new(
+                                    lsp::Position::new(1, 13),
+                                    lsp::Position::new(1, 15),
+                                ),
+                            },
+                            message: "error 2 hint 2".to_string(),
+                        },
+                    ]),
+                    ..Default::default()
+                },
+                lsp::Diagnostic {
+                    range: lsp::Range::new(lsp::Position::new(1, 13), lsp::Position::new(1, 15)),
+                    severity: Some(DiagnosticSeverity::HINT),
+                    message: "error 2 hint 1".to_string(),
+                    related_information: Some(vec![lsp::DiagnosticRelatedInformation {
+                        location: lsp::Location {
+                            uri: buffer_uri.clone(),
+                            range: lsp::Range::new(
+                                lsp::Position::new(2, 8),
+                                lsp::Position::new(2, 17),
+                            ),
+                        },
+                        message: "original diagnostic".to_string(),
+                    }]),
+                    ..Default::default()
+                },
+                lsp::Diagnostic {
+                    range: lsp::Range::new(lsp::Position::new(1, 13), lsp::Position::new(1, 15)),
+                    severity: Some(DiagnosticSeverity::HINT),
+                    message: "error 2 hint 2".to_string(),
+                    related_information: Some(vec![lsp::DiagnosticRelatedInformation {
+                        location: lsp::Location {
+                            uri: buffer_uri.clone(),
+                            range: lsp::Range::new(
+                                lsp::Position::new(2, 8),
+                                lsp::Position::new(2, 17),
+                            ),
+                        },
+                        message: "original diagnostic".to_string(),
+                    }]),
+                    ..Default::default()
+                },
+            ],
+            version: None,
+        };
+
+        project
+            .update(&mut cx, |p, cx| {
+                p.update_diagnostics(message, &Default::default(), cx)
+            })
+            .unwrap();
+        let buffer = buffer.read_with(&cx, |buffer, _| buffer.snapshot());
+
+        assert_eq!(
+            buffer
+                .diagnostics_in_range::<_, Point>(0..buffer.len())
+                .collect::<Vec<_>>(),
+            &[
+                DiagnosticEntry {
+                    range: Point::new(1, 8)..Point::new(1, 9),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::WARNING,
+                        message: "error 1".to_string(),
+                        group_id: 0,
+                        is_primary: true,
+                        ..Default::default()
+                    }
+                },
+                DiagnosticEntry {
+                    range: Point::new(1, 8)..Point::new(1, 9),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::HINT,
+                        message: "error 1 hint 1".to_string(),
+                        group_id: 0,
+                        is_primary: false,
+                        ..Default::default()
+                    }
+                },
+                DiagnosticEntry {
+                    range: Point::new(1, 13)..Point::new(1, 15),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::HINT,
+                        message: "error 2 hint 1".to_string(),
+                        group_id: 1,
+                        is_primary: false,
+                        ..Default::default()
+                    }
+                },
+                DiagnosticEntry {
+                    range: Point::new(1, 13)..Point::new(1, 15),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::HINT,
+                        message: "error 2 hint 2".to_string(),
+                        group_id: 1,
+                        is_primary: false,
+                        ..Default::default()
+                    }
+                },
+                DiagnosticEntry {
+                    range: Point::new(2, 8)..Point::new(2, 17),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::ERROR,
+                        message: "error 2".to_string(),
+                        group_id: 1,
+                        is_primary: true,
+                        ..Default::default()
+                    }
+                }
+            ]
+        );
+
+        assert_eq!(
+            buffer.diagnostic_group::<Point>(0).collect::<Vec<_>>(),
+            &[
+                DiagnosticEntry {
+                    range: Point::new(1, 8)..Point::new(1, 9),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::WARNING,
+                        message: "error 1".to_string(),
+                        group_id: 0,
+                        is_primary: true,
+                        ..Default::default()
+                    }
+                },
+                DiagnosticEntry {
+                    range: Point::new(1, 8)..Point::new(1, 9),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::HINT,
+                        message: "error 1 hint 1".to_string(),
+                        group_id: 0,
+                        is_primary: false,
+                        ..Default::default()
+                    }
+                },
+            ]
+        );
+        assert_eq!(
+            buffer.diagnostic_group::<Point>(1).collect::<Vec<_>>(),
+            &[
+                DiagnosticEntry {
+                    range: Point::new(1, 13)..Point::new(1, 15),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::HINT,
+                        message: "error 2 hint 1".to_string(),
+                        group_id: 1,
+                        is_primary: false,
+                        ..Default::default()
+                    }
+                },
+                DiagnosticEntry {
+                    range: Point::new(1, 13)..Point::new(1, 15),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::HINT,
+                        message: "error 2 hint 2".to_string(),
+                        group_id: 1,
+                        is_primary: false,
+                        ..Default::default()
+                    }
+                },
+                DiagnosticEntry {
+                    range: Point::new(2, 8)..Point::new(2, 17),
+                    diagnostic: Diagnostic {
+                        severity: DiagnosticSeverity::ERROR,
+                        message: "error 2".to_string(),
+                        group_id: 1,
+                        is_primary: true,
+                        ..Default::default()
+                    }
+                }
+            ]
+        );
+    }
+
+    fn build_project(fs: Arc<dyn Fs>, cx: &mut TestAppContext) -> ModelHandle<Project> {
         let languages = Arc::new(LanguageRegistry::new());
-        let fs = Arc::new(RealFs);
         let http_client = FakeHttpClient::with_404_response();
         let client = client::Client::new(http_client.clone());
         let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx));

crates/project/src/worktree.rs 🔗

@@ -5,19 +5,16 @@ use super::{
 };
 use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Result};
-use client::{proto, Client, PeerId, TypedEnvelope};
+use client::{proto, Client, TypedEnvelope};
 use clock::ReplicaId;
-use collections::{hash_map, HashMap, HashSet};
+use collections::HashMap;
 use futures::{Stream, StreamExt};
 use fuzzy::CharBag;
 use gpui::{
     executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
-    Task, UpgradeModelHandle, WeakModelHandle,
-};
-use language::{
-    range_from_lsp, Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, File as _, Operation,
-    PointUtf16, Rope,
+    Task,
 };
+use language::{Buffer, DiagnosticEntry, Operation, PointUtf16, Rope};
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
 use postage::{
@@ -36,14 +33,14 @@ use std::{
     ops::Deref,
     path::{Path, PathBuf},
     sync::{
-        atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
+        atomic::{AtomicUsize, Ordering::SeqCst},
         Arc,
     },
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, TreeMap};
 use sum_tree::{Edit, SeekTarget, SumTree};
-use util::{post_inc, ResultExt, TryFutureExt};
+use util::ResultExt;
 
 lazy_static! {
     static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
@@ -64,8 +61,12 @@ pub enum Worktree {
     Remote(RemoteWorktree),
 }
 
+pub enum Event {
+    UpdatedEntries,
+}
+
 impl Entity for Worktree {
-    type Event = ();
+    type Event = Event;
 
     fn release(&mut self, cx: &mut MutableAppContext) {
         if let Some(worktree) = self.as_local_mut() {
@@ -87,7 +88,7 @@ impl Entity for Worktree {
 }
 
 impl Worktree {
-    pub async fn open_local(
+    pub async fn local(
         client: Arc<Client>,
         path: impl Into<Arc<Path>>,
         weak: bool,
@@ -219,8 +220,6 @@ impl Worktree {
                     snapshot_rx,
                     updates_tx,
                     client: client.clone(),
-                    loading_buffers: Default::default(),
-                    open_buffers: Default::default(),
                     queued_operations: Default::default(),
                     diagnostic_summaries,
                     weak,
@@ -288,18 +287,6 @@ impl Worktree {
         }
     }
 
-    pub fn remove_collaborator(
-        &mut self,
-        peer_id: PeerId,
-        replica_id: ReplicaId,
-        cx: &mut ModelContext<Self>,
-    ) {
-        match self {
-            Worktree::Local(worktree) => worktree.remove_collaborator(peer_id, replica_id, cx),
-            Worktree::Remote(worktree) => worktree.remove_collaborator(replica_id, cx),
-        }
-    }
-
     pub fn diagnostic_summaries<'a>(
         &'a self,
     ) -> impl Iterator<Item = (Arc<Path>, DiagnosticSummary)> + 'a {
@@ -311,267 +298,6 @@ impl Worktree {
         .map(|(path, summary)| (path.0.clone(), summary.clone()))
     }
 
-    pub(crate) fn loading_buffers<'a>(&'a mut self) -> &'a mut LoadingBuffers {
-        match self {
-            Worktree::Local(worktree) => &mut worktree.loading_buffers,
-            Worktree::Remote(worktree) => &mut worktree.loading_buffers,
-        }
-    }
-
-    pub(crate) fn open_buffer(
-        &mut self,
-        path: impl AsRef<Path>,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<(ModelHandle<Buffer>, bool)>> {
-        let path = path.as_ref();
-
-        // If there is already a buffer for the given path, then return it.
-        let existing_buffer = match self {
-            Worktree::Local(worktree) => worktree.get_open_buffer(path, cx),
-            Worktree::Remote(worktree) => worktree.get_open_buffer(path, cx),
-        };
-        if let Some(existing_buffer) = existing_buffer {
-            return cx.spawn(move |_, _| async move { Ok((existing_buffer, false)) });
-        }
-
-        let is_new = Arc::new(AtomicBool::new(true));
-        let path: Arc<Path> = Arc::from(path);
-        let mut loading_watch = match self.loading_buffers().entry(path.clone()) {
-            // If the given path is already being loaded, then wait for that existing
-            // task to complete and return the same buffer.
-            hash_map::Entry::Occupied(e) => e.get().clone(),
-
-            // Otherwise, record the fact that this path is now being loaded.
-            hash_map::Entry::Vacant(entry) => {
-                let (mut tx, rx) = postage::watch::channel();
-                entry.insert(rx.clone());
-
-                let load_buffer = match self {
-                    Worktree::Local(worktree) => worktree.open_buffer(&path, cx),
-                    Worktree::Remote(worktree) => worktree.open_buffer(&path, cx),
-                };
-                cx.spawn(move |this, mut cx| async move {
-                    let result = load_buffer.await;
-
-                    // After the buffer loads, record the fact that it is no longer
-                    // loading.
-                    this.update(&mut cx, |this, _| this.loading_buffers().remove(&path));
-                    *tx.borrow_mut() = Some(match result {
-                        Ok(buffer) => Ok((buffer, is_new)),
-                        Err(error) => Err(Arc::new(error)),
-                    });
-                })
-                .detach();
-                rx
-            }
-        };
-
-        cx.spawn(|_, _| async move {
-            loop {
-                if let Some(result) = loading_watch.borrow().as_ref() {
-                    return match result {
-                        Ok((buf, is_new)) => Ok((buf.clone(), is_new.fetch_and(false, SeqCst))),
-                        Err(error) => Err(anyhow!("{}", error)),
-                    };
-                }
-                loading_watch.recv().await;
-            }
-        })
-    }
-
-    #[cfg(any(test, feature = "test-support"))]
-    pub(crate) fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
-        let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
-            Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
-            Worktree::Remote(worktree) => {
-                Box::new(worktree.open_buffers.values().filter_map(|buf| {
-                    if let RemoteBuffer::Loaded(buf) = buf {
-                        Some(buf)
-                    } else {
-                        None
-                    }
-                }))
-            }
-        };
-
-        let path = path.as_ref();
-        open_buffers
-            .find(|buffer| {
-                if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
-                    file.path().as_ref() == path
-                } else {
-                    false
-                }
-            })
-            .is_some()
-    }
-
-    pub fn handle_update_buffer(
-        &mut self,
-        envelope: TypedEnvelope<proto::UpdateBuffer>,
-        cx: &mut ModelContext<Self>,
-    ) -> Result<()> {
-        let payload = envelope.payload.clone();
-        let buffer_id = payload.buffer_id as usize;
-        let ops = payload
-            .operations
-            .into_iter()
-            .map(|op| language::proto::deserialize_operation(op))
-            .collect::<Result<Vec<_>, _>>()?;
-
-        match self {
-            Worktree::Local(worktree) => {
-                let buffer = worktree
-                    .open_buffers
-                    .get(&buffer_id)
-                    .and_then(|buf| buf.upgrade(cx))
-                    .ok_or_else(|| {
-                        anyhow!("invalid buffer {} in update buffer message", buffer_id)
-                    })?;
-                buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
-            }
-            Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
-                Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
-                Some(RemoteBuffer::Loaded(buffer)) => {
-                    if let Some(buffer) = buffer.upgrade(cx) {
-                        buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
-                    } else {
-                        worktree
-                            .open_buffers
-                            .insert(buffer_id, RemoteBuffer::Operations(ops));
-                    }
-                }
-                None => {
-                    worktree
-                        .open_buffers
-                        .insert(buffer_id, RemoteBuffer::Operations(ops));
-                }
-            },
-        }
-
-        Ok(())
-    }
-
-    pub fn handle_save_buffer(
-        &mut self,
-        envelope: TypedEnvelope<proto::SaveBuffer>,
-        rpc: Arc<Client>,
-        cx: &mut ModelContext<Self>,
-    ) -> Result<()> {
-        let sender_id = envelope.original_sender_id()?;
-        let this = self.as_local().unwrap();
-        let project_id = this
-            .share
-            .as_ref()
-            .ok_or_else(|| anyhow!("can't save buffer while disconnected"))?
-            .project_id;
-
-        let buffer = this
-            .shared_buffers
-            .get(&sender_id)
-            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
-            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
-
-        let receipt = envelope.receipt();
-        let worktree_id = envelope.payload.worktree_id;
-        let buffer_id = envelope.payload.buffer_id;
-        let save = cx.spawn(|_, mut cx| async move {
-            buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await
-        });
-
-        cx.background()
-            .spawn(
-                async move {
-                    let (version, mtime) = save.await?;
-
-                    rpc.respond(
-                        receipt,
-                        proto::BufferSaved {
-                            project_id,
-                            worktree_id,
-                            buffer_id,
-                            version: (&version).into(),
-                            mtime: Some(mtime.into()),
-                        },
-                    )
-                    .await?;
-
-                    Ok(())
-                }
-                .log_err(),
-            )
-            .detach();
-
-        Ok(())
-    }
-
-    pub fn handle_buffer_saved(
-        &mut self,
-        envelope: TypedEnvelope<proto::BufferSaved>,
-        cx: &mut ModelContext<Self>,
-    ) -> Result<()> {
-        let payload = envelope.payload.clone();
-        let worktree = self.as_remote_mut().unwrap();
-        if let Some(buffer) = worktree
-            .open_buffers
-            .get(&(payload.buffer_id as usize))
-            .and_then(|buf| buf.upgrade(cx))
-        {
-            buffer.update(cx, |buffer, cx| {
-                let version = payload.version.try_into()?;
-                let mtime = payload
-                    .mtime
-                    .ok_or_else(|| anyhow!("missing mtime"))?
-                    .into();
-                buffer.did_save(version, mtime, None, cx);
-                Result::<_, anyhow::Error>::Ok(())
-            })?;
-        }
-        Ok(())
-    }
-
-    pub fn handle_format_buffer(
-        &mut self,
-        envelope: TypedEnvelope<proto::FormatBuffer>,
-        rpc: Arc<Client>,
-        cx: &mut ModelContext<Self>,
-    ) -> Result<()> {
-        let sender_id = envelope.original_sender_id()?;
-        let this = self.as_local().unwrap();
-        let buffer = this
-            .shared_buffers
-            .get(&sender_id)
-            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
-            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
-
-        let receipt = envelope.receipt();
-        cx.spawn(|_, mut cx| async move {
-            let format = buffer.update(&mut cx, |buffer, cx| buffer.format(cx)).await;
-            // We spawn here in order to enqueue the sending of `Ack` *after* transmission of edits
-            // associated with formatting.
-            cx.spawn(|_| async move {
-                match format {
-                    Ok(()) => rpc.respond(receipt, proto::Ack {}).await?,
-                    Err(error) => {
-                        rpc.respond_with_error(
-                            receipt,
-                            proto::Error {
-                                message: error.to_string(),
-                            },
-                        )
-                        .await?
-                    }
-                }
-                Ok::<_, anyhow::Error>(())
-            })
-            .await
-            .log_err();
-        })
-        .detach();
-
-        Ok(())
-    }
-
     fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
         match self {
             Self::Local(worktree) => {
@@ -593,94 +319,18 @@ impl Worktree {
                     }
                 } else {
                     worktree.poll_task.take();
-                    self.update_open_buffers(cx);
+                    cx.emit(Event::UpdatedEntries);
                 }
             }
             Self::Remote(worktree) => {
                 worktree.snapshot = worktree.snapshot_rx.borrow().clone();
-                self.update_open_buffers(cx);
+                cx.emit(Event::UpdatedEntries);
             }
         };
 
         cx.notify();
     }
 
-    fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
-        let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
-            Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
-            Self::Remote(worktree) => {
-                Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
-                    if let RemoteBuffer::Loaded(buf) = buf {
-                        Some((id, buf))
-                    } else {
-                        None
-                    }
-                }))
-            }
-        };
-
-        let local = self.as_local().is_some();
-        let worktree_path = self.abs_path.clone();
-        let worktree_handle = cx.handle();
-        let mut buffers_to_delete = Vec::new();
-        for (buffer_id, buffer) in open_buffers {
-            if let Some(buffer) = buffer.upgrade(cx) {
-                buffer.update(cx, |buffer, cx| {
-                    if let Some(old_file) = File::from_dyn(buffer.file()) {
-                        let new_file = if let Some(entry) = old_file
-                            .entry_id
-                            .and_then(|entry_id| self.entry_for_id(entry_id))
-                        {
-                            File {
-                                is_local: local,
-                                worktree_path: worktree_path.clone(),
-                                entry_id: Some(entry.id),
-                                mtime: entry.mtime,
-                                path: entry.path.clone(),
-                                worktree: worktree_handle.clone(),
-                            }
-                        } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
-                            File {
-                                is_local: local,
-                                worktree_path: worktree_path.clone(),
-                                entry_id: Some(entry.id),
-                                mtime: entry.mtime,
-                                path: entry.path.clone(),
-                                worktree: worktree_handle.clone(),
-                            }
-                        } else {
-                            File {
-                                is_local: local,
-                                worktree_path: worktree_path.clone(),
-                                entry_id: None,
-                                path: old_file.path().clone(),
-                                mtime: old_file.mtime(),
-                                worktree: worktree_handle.clone(),
-                            }
-                        };
-
-                        if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
-                            task.detach();
-                        }
-                    }
-                });
-            } else {
-                buffers_to_delete.push(*buffer_id);
-            }
-        }
-
-        for buffer_id in buffers_to_delete {
-            match self {
-                Self::Local(worktree) => {
-                    worktree.open_buffers.remove(&buffer_id);
-                }
-                Self::Remote(worktree) => {
-                    worktree.open_buffers.remove(&buffer_id);
-                }
-            }
-        }
-    }
-
     fn send_buffer_update(
         &mut self,
         buffer_id: u64,
@@ -770,9 +420,6 @@ pub struct LocalWorktree {
     poll_task: Option<Task<()>>,
     registration: Registration,
     share: Option<ShareState>,
-    loading_buffers: LoadingBuffers,
-    open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
-    shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
     diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<PointUtf16>>>,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     queued_operations: Vec<(u64, Operation)>,
@@ -795,26 +442,17 @@ struct ShareState {
 }
 
 pub struct RemoteWorktree {
+    pub(crate) snapshot: Snapshot,
     project_id: u64,
-    snapshot: Snapshot,
     snapshot_rx: watch::Receiver<Snapshot>,
     client: Arc<Client>,
     updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
     replica_id: ReplicaId,
-    loading_buffers: LoadingBuffers,
-    open_buffers: HashMap<usize, RemoteBuffer>,
     queued_operations: Vec<(u64, Operation)>,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     weak: bool,
 }
 
-type LoadingBuffers = HashMap<
-    Arc<Path>,
-    postage::watch::Receiver<
-        Option<Result<(ModelHandle<Buffer>, Arc<AtomicBool>), Arc<anyhow::Error>>>,
-    >,
->;
-
 #[derive(Default, Deserialize)]
 struct WorktreeConfig {
     collaborators: Vec<String>,
@@ -883,9 +521,6 @@ impl LocalWorktree {
                 registration: Registration::None,
                 share: None,
                 poll_task: None,
-                loading_buffers: Default::default(),
-                open_buffers: Default::default(),
-                shared_buffers: Default::default(),
                 diagnostics: Default::default(),
                 diagnostic_summaries: Default::default(),
                 queued_operations: Default::default(),
@@ -931,29 +566,7 @@ impl LocalWorktree {
         self.config.collaborators.clone()
     }
 
-    fn get_open_buffer(
-        &mut self,
-        path: &Path,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Option<ModelHandle<Buffer>> {
-        let handle = cx.handle();
-        let mut result = None;
-        self.open_buffers.retain(|_buffer_id, buffer| {
-            if let Some(buffer) = buffer.upgrade(cx) {
-                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
-                    if file.worktree == handle && file.path().as_ref() == path {
-                        result = Some(buffer);
-                    }
-                }
-                true
-            } else {
-                false
-            }
-        });
-        result
-    }
-
-    fn open_buffer(
+    pub(crate) fn open_buffer(
         &mut self,
         path: &Path,
         cx: &mut ModelContext<Worktree>,
@@ -968,195 +581,22 @@ impl LocalWorktree {
                 this.as_local_mut().unwrap().diagnostics.get(&path).cloned()
             });
 
-            let mut buffer_operations = Vec::new();
-            let buffer = cx.add_model(|cx| {
+            Ok(cx.add_model(|cx| {
                 let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
                 if let Some(diagnostics) = diagnostics {
-                    let op = buffer.update_diagnostics(None, diagnostics, cx).unwrap();
-                    buffer_operations.push(op);
+                    buffer.update_diagnostics(None, diagnostics, cx).unwrap();
                 }
                 buffer
-            });
-
-            this.update(&mut cx, |this, cx| {
-                for op in buffer_operations {
-                    this.send_buffer_update(buffer.read(cx).remote_id(), op, cx);
-                }
-                let this = this.as_local_mut().unwrap();
-                this.open_buffers.insert(buffer.id(), buffer.downgrade());
-            });
-
-            Ok(buffer)
+            }))
         })
     }
 
-    pub fn open_remote_buffer(
-        &mut self,
-        peer_id: PeerId,
-        buffer: ModelHandle<Buffer>,
-        cx: &mut ModelContext<Worktree>,
-    ) -> proto::OpenBufferResponse {
-        self.shared_buffers
-            .entry(peer_id)
-            .or_default()
-            .insert(buffer.id() as u64, buffer.clone());
-        proto::OpenBufferResponse {
-            buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
-        }
-    }
-
-    pub fn close_remote_buffer(
-        &mut self,
-        envelope: TypedEnvelope<proto::CloseBuffer>,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Result<()> {
-        if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
-            shared_buffers.remove(&envelope.payload.buffer_id);
-            cx.notify();
-        }
-
-        Ok(())
-    }
-
-    pub fn remove_collaborator(
-        &mut self,
-        peer_id: PeerId,
-        replica_id: ReplicaId,
-        cx: &mut ModelContext<Worktree>,
-    ) {
-        self.shared_buffers.remove(&peer_id);
-        for (_, buffer) in &self.open_buffers {
-            if let Some(buffer) = buffer.upgrade(cx) {
-                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
-            }
-        }
-        cx.notify();
-    }
-
     pub fn update_diagnostics(
         &mut self,
         worktree_path: Arc<Path>,
-        params: lsp::PublishDiagnosticsParams,
-        disk_based_sources: &HashSet<String>,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Result<()> {
-        let mut next_group_id = 0;
-        let mut diagnostics = Vec::default();
-        let mut primary_diagnostic_group_ids = HashMap::default();
-        let mut sources_by_group_id = HashMap::default();
-        let mut supporting_diagnostic_severities = HashMap::default();
-        for diagnostic in &params.diagnostics {
-            let source = diagnostic.source.as_ref();
-            let code = diagnostic.code.as_ref().map(|code| match code {
-                lsp::NumberOrString::Number(code) => code.to_string(),
-                lsp::NumberOrString::String(code) => code.clone(),
-            });
-            let range = range_from_lsp(diagnostic.range);
-            let is_supporting = diagnostic
-                .related_information
-                .as_ref()
-                .map_or(false, |infos| {
-                    infos.iter().any(|info| {
-                        primary_diagnostic_group_ids.contains_key(&(
-                            source,
-                            code.clone(),
-                            range_from_lsp(info.location.range),
-                        ))
-                    })
-                });
-
-            if is_supporting {
-                if let Some(severity) = diagnostic.severity {
-                    supporting_diagnostic_severities
-                        .insert((source, code.clone(), range), severity);
-                }
-            } else {
-                let group_id = post_inc(&mut next_group_id);
-                let is_disk_based =
-                    source.map_or(false, |source| disk_based_sources.contains(source));
-
-                sources_by_group_id.insert(group_id, source);
-                primary_diagnostic_group_ids
-                    .insert((source, code.clone(), range.clone()), group_id);
-
-                diagnostics.push(DiagnosticEntry {
-                    range,
-                    diagnostic: Diagnostic {
-                        code: code.clone(),
-                        severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
-                        message: diagnostic.message.clone(),
-                        group_id,
-                        is_primary: true,
-                        is_valid: true,
-                        is_disk_based,
-                    },
-                });
-                if let Some(infos) = &diagnostic.related_information {
-                    for info in infos {
-                        if info.location.uri == params.uri {
-                            let range = range_from_lsp(info.location.range);
-                            diagnostics.push(DiagnosticEntry {
-                                range,
-                                diagnostic: Diagnostic {
-                                    code: code.clone(),
-                                    severity: DiagnosticSeverity::INFORMATION,
-                                    message: info.message.clone(),
-                                    group_id,
-                                    is_primary: false,
-                                    is_valid: true,
-                                    is_disk_based,
-                                },
-                            });
-                        }
-                    }
-                }
-            }
-        }
-
-        for entry in &mut diagnostics {
-            let diagnostic = &mut entry.diagnostic;
-            if !diagnostic.is_primary {
-                let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
-                if let Some(&severity) = supporting_diagnostic_severities.get(&(
-                    source,
-                    diagnostic.code.clone(),
-                    entry.range.clone(),
-                )) {
-                    diagnostic.severity = severity;
-                }
-            }
-        }
-
-        self.update_diagnostic_entries(worktree_path, params.version, diagnostics, cx)?;
-        Ok(())
-    }
-
-    pub fn update_diagnostic_entries(
-        &mut self,
-        worktree_path: Arc<Path>,
-        version: Option<i32>,
         diagnostics: Vec<DiagnosticEntry<PointUtf16>>,
         cx: &mut ModelContext<Worktree>,
     ) -> Result<()> {
-        for buffer in self.open_buffers.values() {
-            if let Some(buffer) = buffer.upgrade(cx) {
-                if buffer
-                    .read(cx)
-                    .file()
-                    .map_or(false, |file| *file.path() == worktree_path)
-                {
-                    let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
-                        (
-                            buffer.remote_id(),
-                            buffer.update_diagnostics(version, diagnostics.clone(), cx),
-                        )
-                    });
-                    self.send_buffer_update(remote_id, operation?, cx);
-                    break;
-                }
-            }
-        }
-
         let summary = DiagnosticSummary::new(&diagnostics);
         self.diagnostic_summaries
             .insert(PathKey(worktree_path.clone()), summary.clone());
@@ -1192,40 +632,6 @@ impl LocalWorktree {
         Ok(())
     }
 
-    fn send_buffer_update(
-        &mut self,
-        buffer_id: u64,
-        operation: Operation,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Option<()> {
-        let share = self.share.as_ref()?;
-        let project_id = share.project_id;
-        let worktree_id = self.id();
-        let rpc = self.client.clone();
-        cx.spawn(|worktree, mut cx| async move {
-            if let Err(error) = rpc
-                .request(proto::UpdateBuffer {
-                    project_id,
-                    worktree_id: worktree_id.0 as u64,
-                    buffer_id,
-                    operations: vec![language::proto::serialize_operation(&operation)],
-                })
-                .await
-            {
-                worktree.update(&mut cx, |worktree, _| {
-                    log::error!("error sending buffer operation: {}", error);
-                    worktree
-                        .as_local_mut()
-                        .unwrap()
-                        .queued_operations
-                        .push((buffer_id, operation));
-                });
-            }
-        })
-        .detach();
-        None
-    }
-
     pub fn scan_complete(&self) -> impl Future<Output = ()> {
         let mut scan_state_rx = self.last_scan_state_rx.clone();
         async move {
@@ -1248,22 +654,6 @@ impl LocalWorktree {
         self.snapshot.clone()
     }
 
-    pub fn abs_path(&self) -> &Arc<Path> {
-        &self.snapshot.abs_path
-    }
-
-    pub fn contains_abs_path(&self, path: &Path) -> bool {
-        path.starts_with(&self.snapshot.abs_path)
-    }
-
-    fn absolutize(&self, path: &Path) -> PathBuf {
-        if path.file_name().is_some() {
-            self.snapshot.abs_path.join(path)
-        } else {
-            self.snapshot.abs_path.to_path_buf()
-        }
-    }
-
     fn load(&self, path: &Path, cx: &mut ModelContext<Worktree>) -> Task<Result<(File, String)>> {
         let handle = cx.handle();
         let path = Arc::from(path);
@@ -1304,8 +694,6 @@ impl LocalWorktree {
             let entry = save.await?;
             let file = this.update(&mut cx, |this, cx| {
                 let this = this.as_local_mut().unwrap();
-                this.open_buffers
-                    .insert(buffer_handle.id(), buffer_handle.downgrade());
                 File {
                     entry_id: Some(entry.id),
                     worktree: cx.handle(),
@@ -1485,29 +873,7 @@ impl fmt::Debug for LocalWorktree {
 }
 
 impl RemoteWorktree {
-    fn get_open_buffer(
-        &mut self,
-        path: &Path,
-        cx: &mut ModelContext<Worktree>,
-    ) -> Option<ModelHandle<Buffer>> {
-        let handle = cx.handle();
-        let mut existing_buffer = None;
-        self.open_buffers.retain(|_buffer_id, buffer| {
-            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
-                if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
-                    if file.worktree == handle && file.path().as_ref() == path {
-                        existing_buffer = Some(buffer);
-                    }
-                }
-                true
-            } else {
-                false
-            }
-        });
-        existing_buffer
-    }
-
-    fn open_buffer(
+    pub(crate) fn open_buffer(
         &mut self,
         path: &Path,
         cx: &mut ModelContext<Worktree>,
@@ -1545,20 +911,9 @@ impl RemoteWorktree {
                 is_local: false,
             };
             let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
-            let buffer_id = remote_buffer.id as usize;
-            let buffer = cx.add_model(|cx| {
+            Ok(cx.add_model(|cx| {
                 Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx).unwrap()
-            });
-            this.update(&mut cx, move |this, cx| {
-                let this = this.as_remote_mut().unwrap();
-                if let Some(RemoteBuffer::Operations(pending_ops)) = this
-                    .open_buffers
-                    .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade()))
-                {
-                    buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?;
-                }
-                Result::<_, anyhow::Error>::Ok(buffer)
-            })
+            }))
         })
     }
 
@@ -1597,29 +952,6 @@ impl RemoteWorktree {
             },
         );
     }
-
-    pub fn remove_collaborator(&mut self, replica_id: ReplicaId, cx: &mut ModelContext<Worktree>) {
-        for (_, buffer) in &self.open_buffers {
-            if let Some(buffer) = buffer.upgrade(cx) {
-                buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
-            }
-        }
-        cx.notify();
-    }
-}
-
-enum RemoteBuffer {
-    Operations(Vec<Operation>),
-    Loaded(WeakModelHandle<Buffer>),
-}
-
-impl RemoteBuffer {
-    fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
-        match self {
-            Self::Operations(_) => None,
-            Self::Loaded(buffer) => buffer.upgrade(cx),
-        }
-    }
 }
 
 impl Snapshot {
@@ -1627,7 +959,7 @@ impl Snapshot {
         self.id
     }
 
-    pub fn to_proto(
+    pub(crate) fn to_proto(
         &self,
         diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
         weak: bool,
@@ -1650,7 +982,7 @@ impl Snapshot {
         }
     }
 
-    pub fn build_update(
+    pub(crate) fn build_update(
         &self,
         other: &Self,
         project_id: u64,
@@ -1715,7 +1047,7 @@ impl Snapshot {
         }
     }
 
-    fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
+    pub(crate) fn apply_update(&mut self, update: proto::UpdateWorktree) -> Result<()> {
         self.scan_id += 1;
         let scan_id = self.scan_id;
 
@@ -1826,6 +1158,22 @@ impl Snapshot {
         }
     }
 
+    pub fn contains_abs_path(&self, path: &Path) -> bool {
+        path.starts_with(&self.abs_path)
+    }
+
+    fn absolutize(&self, path: &Path) -> PathBuf {
+        if path.file_name().is_some() {
+            self.abs_path.join(path)
+        } else {
+            self.abs_path.to_path_buf()
+        }
+    }
+
+    pub fn abs_path(&self) -> &Arc<Path> {
+        &self.abs_path
+    }
+
     pub fn root_entry(&self) -> Option<&Entry> {
         self.entry_for_path("")
     }
@@ -2006,12 +1354,12 @@ impl fmt::Debug for Snapshot {
 
 #[derive(Clone, PartialEq)]
 pub struct File {
-    entry_id: Option<usize>,
     pub worktree: ModelHandle<Worktree>,
-    worktree_path: Arc<Path>,
     pub path: Arc<Path>,
     pub mtime: SystemTime,
-    is_local: bool,
+    pub(crate) entry_id: Option<usize>,
+    pub(crate) worktree_path: Arc<Path>,
+    pub(crate) is_local: bool,
 }
 
 impl language::File for File {
@@ -3062,18 +2410,13 @@ mod tests {
     use anyhow::Result;
     use client::test::FakeHttpClient;
     use fs::RealFs;
-    use language::{Diagnostic, DiagnosticEntry};
-    use lsp::Url;
     use rand::prelude::*;
     use serde_json::json;
-    use std::{cell::RefCell, rc::Rc};
     use std::{
         env,
         fmt::Write,
         time::{SystemTime, UNIX_EPOCH},
     };
-    use text::Point;
-    use unindent::Unindent as _;
     use util::test::temp_tree;
 
     #[gpui::test]
@@ -3094,7 +2437,7 @@ mod tests {
         let http_client = FakeHttpClient::with_404_response();
         let client = Client::new(http_client);
 
-        let tree = Worktree::open_local(
+        let tree = Worktree::local(
             client,
             Arc::from(Path::new("/root")),
             false,
@@ -3121,225 +2464,6 @@ mod tests {
         })
     }
 
-    #[gpui::test]
-    async fn test_save_file(mut cx: gpui::TestAppContext) {
-        let dir = temp_tree(json!({
-            "file1": "the old contents",
-        }));
-
-        let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client);
-
-        let tree = Worktree::open_local(
-            client,
-            dir.path(),
-            false,
-            Arc::new(RealFs),
-            &mut cx.to_async(),
-        )
-        .await
-        .unwrap();
-        let (buffer, _) = tree
-            .update(&mut cx, |tree, cx| tree.open_buffer("file1", cx))
-            .await
-            .unwrap();
-        let save = buffer.update(&mut cx, |buffer, cx| {
-            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
-            buffer.save(cx)
-        });
-        save.await.unwrap();
-
-        let new_text = std::fs::read_to_string(dir.path().join("file1")).unwrap();
-        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
-    }
-
-    #[gpui::test]
-    async fn test_save_in_single_file_worktree(mut cx: gpui::TestAppContext) {
-        let dir = temp_tree(json!({
-            "file1": "the old contents",
-        }));
-        let file_path = dir.path().join("file1");
-
-        let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client);
-
-        let tree = Worktree::open_local(
-            client,
-            file_path.clone(),
-            false,
-            Arc::new(RealFs),
-            &mut cx.to_async(),
-        )
-        .await
-        .unwrap();
-        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
-            .await;
-        cx.read(|cx| assert_eq!(tree.read(cx).file_count(), 1));
-
-        let (buffer, _) = tree
-            .update(&mut cx, |tree, cx| tree.open_buffer("", cx))
-            .await
-            .unwrap();
-        let save = buffer.update(&mut cx, |buffer, cx| {
-            buffer.edit(Some(0..0), "a line of text.\n".repeat(10 * 1024), cx);
-            buffer.save(cx)
-        });
-        save.await.unwrap();
-
-        let new_text = std::fs::read_to_string(file_path).unwrap();
-        assert_eq!(new_text, buffer.read_with(&cx, |buffer, _| buffer.text()));
-    }
-
-    #[gpui::test]
-    async fn test_rescan_and_remote_updates(mut cx: gpui::TestAppContext) {
-        let dir = temp_tree(json!({
-            "a": {
-                "file1": "",
-                "file2": "",
-                "file3": "",
-            },
-            "b": {
-                "c": {
-                    "file4": "",
-                    "file5": "",
-                }
-            }
-        }));
-
-        let http_client = FakeHttpClient::with_404_response();
-        let client = Client::new(http_client.clone());
-        let tree = Worktree::open_local(
-            client,
-            dir.path(),
-            false,
-            Arc::new(RealFs),
-            &mut cx.to_async(),
-        )
-        .await
-        .unwrap();
-
-        let buffer_for_path = |path: &'static str, cx: &mut gpui::TestAppContext| {
-            let buffer = tree.update(cx, |tree, cx| tree.open_buffer(path, cx));
-            async move { buffer.await.unwrap().0 }
-        };
-        let id_for_path = |path: &'static str, cx: &gpui::TestAppContext| {
-            tree.read_with(cx, |tree, _| {
-                tree.entry_for_path(path)
-                    .expect(&format!("no entry for path {}", path))
-                    .id
-            })
-        };
-
-        let buffer2 = buffer_for_path("a/file2", &mut cx).await;
-        let buffer3 = buffer_for_path("a/file3", &mut cx).await;
-        let buffer4 = buffer_for_path("b/c/file4", &mut cx).await;
-        let buffer5 = buffer_for_path("b/c/file5", &mut cx).await;
-
-        let file2_id = id_for_path("a/file2", &cx);
-        let file3_id = id_for_path("a/file3", &cx);
-        let file4_id = id_for_path("b/c/file4", &cx);
-
-        // Wait for the initial scan.
-        cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
-            .await;
-
-        // Create a remote copy of this worktree.
-        let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot());
-        let remote = Worktree::remote(
-            1,
-            1,
-            initial_snapshot.to_proto(&Default::default(), Default::default()),
-            Client::new(http_client.clone()),
-            &mut cx.to_async(),
-        )
-        .await
-        .unwrap();
-
-        cx.read(|cx| {
-            assert!(!buffer2.read(cx).is_dirty());
-            assert!(!buffer3.read(cx).is_dirty());
-            assert!(!buffer4.read(cx).is_dirty());
-            assert!(!buffer5.read(cx).is_dirty());
-        });
-
-        // Rename and delete files and directories.
-        tree.flush_fs_events(&cx).await;
-        std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap();
-        std::fs::remove_file(dir.path().join("b/c/file5")).unwrap();
-        std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap();
-        std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap();
-        tree.flush_fs_events(&cx).await;
-
-        let expected_paths = vec![
-            "a",
-            "a/file1",
-            "a/file2.new",
-            "b",
-            "d",
-            "d/file3",
-            "d/file4",
-        ];
-
-        cx.read(|app| {
-            assert_eq!(
-                tree.read(app)
-                    .paths()
-                    .map(|p| p.to_str().unwrap())
-                    .collect::<Vec<_>>(),
-                expected_paths
-            );
-
-            assert_eq!(id_for_path("a/file2.new", &cx), file2_id);
-            assert_eq!(id_for_path("d/file3", &cx), file3_id);
-            assert_eq!(id_for_path("d/file4", &cx), file4_id);
-
-            assert_eq!(
-                buffer2.read(app).file().unwrap().path().as_ref(),
-                Path::new("a/file2.new")
-            );
-            assert_eq!(
-                buffer3.read(app).file().unwrap().path().as_ref(),
-                Path::new("d/file3")
-            );
-            assert_eq!(
-                buffer4.read(app).file().unwrap().path().as_ref(),
-                Path::new("d/file4")
-            );
-            assert_eq!(
-                buffer5.read(app).file().unwrap().path().as_ref(),
-                Path::new("b/c/file5")
-            );
-
-            assert!(!buffer2.read(app).file().unwrap().is_deleted());
-            assert!(!buffer3.read(app).file().unwrap().is_deleted());
-            assert!(!buffer4.read(app).file().unwrap().is_deleted());
-            assert!(buffer5.read(app).file().unwrap().is_deleted());
-        });
-
-        // Update the remote worktree. Check that it becomes consistent with the
-        // local worktree.
-        remote.update(&mut cx, |remote, cx| {
-            let update_message =
-                tree.read(cx)
-                    .snapshot()
-                    .build_update(&initial_snapshot, 1, 1, true);
-            remote
-                .as_remote_mut()
-                .unwrap()
-                .snapshot
-                .apply_update(update_message)
-                .unwrap();
-
-            assert_eq!(
-                remote
-                    .paths()
-                    .map(|p| p.to_str().unwrap())
-                    .collect::<Vec<_>>(),
-                expected_paths
-            );
-        });
-    }
-
     #[gpui::test]
     async fn test_rescan_with_gitignore(cx: gpui::TestAppContext) {
         let dir = temp_tree(json!({