Detailed changes
@@ -725,7 +725,6 @@ mod tests {
use gpui::TestAppContext;
use language::{Diagnostic, DiagnosticEntry, DiagnosticSeverity, PointUtf16};
use serde_json::json;
- use std::sync::Arc;
use unindent::Unindent as _;
use workspace::WorkspaceParams;
@@ -764,21 +763,18 @@ mod tests {
)
.await;
- let (worktree, _) = project
+ project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path("/test", false, cx)
+ project.find_or_create_local_worktree("/test", false, cx)
})
.await
.unwrap();
- let worktree_id = worktree.read_with(&cx, |tree, _| tree.id());
// Create some diagnostics
- worktree.update(&mut cx, |worktree, cx| {
- worktree
- .as_local_mut()
- .unwrap()
+ project.update(&mut cx, |project, cx| {
+ project
.update_diagnostic_entries(
- Arc::from("/test/main.rs".as_ref()),
+ PathBuf::from("/test/main.rs"),
None,
vec![
DiagnosticEntry {
@@ -926,12 +922,11 @@ mod tests {
});
// Diagnostics are added for another earlier path.
- worktree.update(&mut cx, |worktree, cx| {
- worktree
- .as_local_mut()
- .unwrap()
+ project.update(&mut cx, |project, cx| {
+ project.disk_based_diagnostics_started(cx);
+ project
.update_diagnostic_entries(
- Arc::from("/test/consts.rs".as_ref()),
+ PathBuf::from("/test/consts.rs"),
None,
vec![DiagnosticEntry {
range: PointUtf16::new(0, 15)..PointUtf16::new(0, 15),
@@ -947,13 +942,7 @@ mod tests {
cx,
)
.unwrap();
- });
- project.update(&mut cx, |_, cx| {
- cx.emit(project::Event::DiagnosticsUpdated(ProjectPath {
- worktree_id,
- path: Arc::from("/test/consts.rs".as_ref()),
- }));
- cx.emit(project::Event::DiskBasedDiagnosticsFinished);
+ project.disk_based_diagnostics_finished(cx);
});
view.next_notification(&cx).await;
@@ -1032,12 +1021,11 @@ mod tests {
});
// Diagnostics are added to the first path
- worktree.update(&mut cx, |worktree, cx| {
- worktree
- .as_local_mut()
- .unwrap()
+ project.update(&mut cx, |project, cx| {
+ project.disk_based_diagnostics_started(cx);
+ project
.update_diagnostic_entries(
- Arc::from("/test/consts.rs".as_ref()),
+ PathBuf::from("/test/consts.rs"),
None,
vec![
DiagnosticEntry {
@@ -1067,13 +1055,7 @@ mod tests {
cx,
)
.unwrap();
- });
- project.update(&mut cx, |_, cx| {
- cx.emit(project::Event::DiagnosticsUpdated(ProjectPath {
- worktree_id,
- path: Arc::from("/test/consts.rs".as_ref()),
- }));
- cx.emit(project::Event::DiskBasedDiagnosticsFinished);
+ project.disk_based_diagnostics_finished(cx);
});
view.next_notification(&cx).await;
@@ -457,7 +457,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/root"), false, cx)
+ project.find_or_create_local_worktree("/root", false, cx)
})
.await
.unwrap();
@@ -518,7 +518,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/dir"), false, cx)
+ project.find_or_create_local_worktree("/dir", false, cx)
})
.await
.unwrap();
@@ -584,11 +584,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(
- Path::new("/root/the-parent-dir/the-file"),
- false,
- cx,
- )
+ project.find_or_create_local_worktree("/root/the-parent-dir/the-file", false, cx)
})
.await
.unwrap();
@@ -1653,8 +1653,13 @@ impl MutableAppContext {
Fut: 'static + Future<Output = T>,
T: 'static,
{
+ let future = f(self.to_async());
let cx = self.to_async();
- self.foreground.spawn(f(cx))
+ self.foreground.spawn(async move {
+ let result = future.await;
+ cx.0.borrow_mut().flush_effects();
+ result
+ })
}
pub fn to_async(&self) -> AsyncAppContext {
@@ -856,7 +856,7 @@ impl Buffer {
version: Option<i32>,
mut diagnostics: Vec<DiagnosticEntry<T>>,
cx: &mut ModelContext<Self>,
- ) -> Result<Operation>
+ ) -> Result<()>
where
T: Copy + Ord + TextDimension + Sub<Output = T> + Clip + ToPoint,
{
@@ -869,19 +869,19 @@ impl Buffer {
}
let version = version.map(|version| version as usize);
- let content = if let Some(version) = version {
- let language_server = self.language_server.as_mut().unwrap();
- language_server
- .pending_snapshots
- .retain(|&v, _| v >= version);
- let snapshot = language_server
- .pending_snapshots
- .get(&version)
- .ok_or_else(|| anyhow!("missing snapshot"))?;
- &snapshot.buffer_snapshot
- } else {
- self.deref()
- };
+ let content =
+ if let Some((version, language_server)) = version.zip(self.language_server.as_mut()) {
+ language_server
+ .pending_snapshots
+ .retain(|&v, _| v >= version);
+ let snapshot = language_server
+ .pending_snapshots
+ .get(&version)
+ .ok_or_else(|| anyhow!("missing snapshot"))?;
+ &snapshot.buffer_snapshot
+ } else {
+ self.deref()
+ };
diagnostics.sort_unstable_by(|a, b| {
Ordering::Equal
@@ -944,10 +944,13 @@ impl Buffer {
let set = DiagnosticSet::new(sanitized_diagnostics, content);
self.apply_diagnostic_update(set.clone(), cx);
- Ok(Operation::UpdateDiagnostics {
+
+ let op = Operation::UpdateDiagnostics {
diagnostics: set.iter().cloned().collect(),
lamport_timestamp: self.text.lamport_clock.tick(),
- })
+ };
+ self.send_operation(op, cx);
+ Ok(())
}
fn request_autoindent(&mut self, cx: &mut ModelContext<Self>) {
@@ -13,17 +13,19 @@ use gpui::{
WeakModelHandle,
};
use language::{
- Bias, Buffer, DiagnosticEntry, File as _, Language, LanguageRegistry, ToOffset, ToPointUtf16,
+ range_from_lsp, Bias, Buffer, Diagnostic, DiagnosticEntry, File as _, Language,
+ LanguageRegistry, Operation, PointUtf16, ToOffset, ToPointUtf16,
};
use lsp::{DiagnosticSeverity, LanguageServer};
use postage::{prelude::Stream, watch};
use smol::block_on;
use std::{
+ convert::TryInto,
ops::Range,
path::{Path, PathBuf},
sync::{atomic::AtomicBool, Arc},
};
-use util::{ResultExt, TryFutureExt as _};
+use util::{post_inc, ResultExt, TryFutureExt as _};
pub use fs::*;
pub use worktree::*;
@@ -40,6 +42,17 @@ pub struct Project {
collaborators: HashMap<PeerId, Collaborator>,
subscriptions: Vec<client::Subscription>,
language_servers_with_diagnostics_running: isize,
+ open_buffers: HashMap<usize, OpenBuffer>,
+ loading_buffers: HashMap<
+ ProjectPath,
+ postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
+ >,
+ shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
+}
+
+enum OpenBuffer {
+ Operations(Vec<Operation>),
+ Loaded(WeakModelHandle<Buffer>),
}
enum WorktreeHandle {
@@ -192,6 +205,9 @@ impl Project {
Self {
worktrees: Default::default(),
collaborators: Default::default(),
+ open_buffers: Default::default(),
+ loading_buffers: Default::default(),
+ shared_buffers: Default::default(),
client_state: ProjectClientState::Local {
is_shared: false,
remote_id_tx,
@@ -230,17 +246,8 @@ impl Project {
let mut worktrees = Vec::new();
for worktree in response.worktrees {
- worktrees.push(
- Worktree::remote(
- remote_id,
- replica_id,
- worktree,
- client.clone(),
- user_store.clone(),
- cx,
- )
- .await?,
- );
+ worktrees
+ .push(Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx).await?);
}
let user_ids = response
@@ -260,6 +267,9 @@ impl Project {
Ok(cx.add_model(|cx| {
let mut this = Self {
worktrees: Vec::new(),
+ open_buffers: Default::default(),
+ loading_buffers: Default::default(),
+ shared_buffers: Default::default(),
active_entry: None,
collaborators,
languages,
@@ -412,7 +422,7 @@ impl Project {
for worktree in this.worktrees(cx).collect::<Vec<_>>() {
worktree.update(cx, |worktree, cx| {
let worktree = worktree.as_local_mut().unwrap();
- tasks.push(worktree.share(cx));
+ tasks.push(worktree.share(project_id, cx));
});
}
});
@@ -476,23 +486,65 @@ impl Project {
pub fn open_buffer(
&mut self,
- path: ProjectPath,
+ path: impl Into<ProjectPath>,
cx: &mut ModelContext<Self>,
) -> Task<Result<ModelHandle<Buffer>>> {
- let worktree = if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
+ let project_path = path.into();
+ let worktree = if let Some(worktree) = self.worktree_for_id(project_path.worktree_id, cx) {
worktree
} else {
- return cx.spawn(|_, _| async move { Err(anyhow!("no such worktree")) });
+ return Task::ready(Err(anyhow!("no such worktree")));
};
- let buffer_task = worktree.update(cx, |worktree, cx| worktree.open_buffer(path.path, cx));
- cx.spawn(|this, mut cx| async move {
- let (buffer, buffer_is_new) = buffer_task.await?;
- if buffer_is_new {
- this.update(&mut cx, |this, cx| {
- this.assign_language_to_buffer(worktree, buffer.clone(), cx)
+
+ // If there is already a buffer for the given path, then return it.
+ let existing_buffer = self.get_open_buffer(&project_path, cx);
+ if let Some(existing_buffer) = existing_buffer {
+ return Task::ready(Ok(existing_buffer));
+ }
+
+ let mut loading_watch = match self.loading_buffers.entry(project_path.clone()) {
+ // If the given path is already being loaded, then wait for that existing
+ // task to complete and return the same buffer.
+ hash_map::Entry::Occupied(e) => e.get().clone(),
+
+ // Otherwise, record the fact that this path is now being loaded.
+ hash_map::Entry::Vacant(entry) => {
+ let (mut tx, rx) = postage::watch::channel();
+ entry.insert(rx.clone());
+
+ let load_buffer = worktree.update(cx, |worktree, cx| {
+ worktree.load_buffer(&project_path.path, cx)
});
+
+ cx.spawn(move |this, mut cx| async move {
+ let load_result = load_buffer.await;
+ *tx.borrow_mut() = Some(this.update(&mut cx, |this, cx| {
+ // Record the fact that the buffer is no longer loading.
+ this.loading_buffers.remove(&project_path);
+ let buffer = load_result.map_err(Arc::new)?;
+ this.open_buffers.insert(
+ buffer.read(cx).remote_id() as usize,
+ OpenBuffer::Loaded(buffer.downgrade()),
+ );
+ this.assign_language_to_buffer(&worktree, &buffer, cx);
+ Ok(buffer)
+ }));
+ })
+ .detach();
+ rx
+ }
+ };
+
+ cx.foreground().spawn(async move {
+ loop {
+ if let Some(result) = loading_watch.borrow().as_ref() {
+ match result {
+ Ok(buffer) => return Ok(buffer.clone()),
+ Err(error) => return Err(anyhow!("{}", error)),
+ }
+ }
+ loading_watch.recv().await;
}
- Ok(buffer)
})
}
@@ -502,7 +554,7 @@ impl Project {
abs_path: PathBuf,
cx: &mut ModelContext<Project>,
) -> Task<Result<()>> {
- let worktree_task = self.find_or_create_worktree_for_abs_path(&abs_path, false, cx);
+ let worktree_task = self.find_or_create_local_worktree(&abs_path, false, cx);
cx.spawn(|this, mut cx| async move {
let (worktree, path) = worktree_task.await?;
worktree
@@ -514,43 +566,106 @@ impl Project {
})
.await?;
this.update(&mut cx, |this, cx| {
- this.assign_language_to_buffer(worktree, buffer, cx)
+ this.open_buffers
+ .insert(buffer.id(), OpenBuffer::Loaded(buffer.downgrade()));
+ this.assign_language_to_buffer(&worktree, &buffer, cx);
});
Ok(())
})
}
+ #[cfg(any(test, feature = "test-support"))]
+ pub fn has_open_buffer(&self, path: impl Into<ProjectPath>, cx: &AppContext) -> bool {
+ let path = path.into();
+ if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) {
+ self.open_buffers.iter().any(|(_, buffer)| {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+ if file.worktree == worktree && file.path() == &path.path {
+ return true;
+ }
+ }
+ }
+ false
+ })
+ } else {
+ false
+ }
+ }
+
+ fn get_open_buffer(
+ &mut self,
+ path: &ProjectPath,
+ cx: &mut ModelContext<Self>,
+ ) -> Option<ModelHandle<Buffer>> {
+ let mut result = None;
+ let worktree = self.worktree_for_id(path.worktree_id, cx)?;
+ self.open_buffers.retain(|_, buffer| {
+ if let OpenBuffer::Loaded(buffer) = buffer {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
+ if file.worktree == worktree && file.path() == &path.path {
+ result = Some(buffer);
+ }
+ }
+ return true;
+ }
+ }
+ false
+ });
+ result
+ }
+
fn assign_language_to_buffer(
&mut self,
- worktree: ModelHandle<Worktree>,
- buffer: ModelHandle<Buffer>,
+ worktree: &ModelHandle<Worktree>,
+ buffer: &ModelHandle<Buffer>,
cx: &mut ModelContext<Self>,
) -> Option<()> {
- // Set the buffer's language
- let full_path = buffer.read(cx).file()?.full_path();
- let language = self.languages.select_language(&full_path)?.clone();
- buffer.update(cx, |buffer, cx| {
- buffer.set_language(Some(language.clone()), cx);
- });
+ let (path, full_path) = {
+ let file = buffer.read(cx).file()?;
+ (file.path().clone(), file.full_path())
+ };
- // For local worktrees, start a language server if needed.
- let worktree = worktree.read(cx);
- let worktree_id = worktree.id();
- let worktree_abs_path = worktree.as_local()?.abs_path().clone();
- let language_server = match self
- .language_servers
- .entry((worktree_id, language.name().to_string()))
- {
- hash_map::Entry::Occupied(e) => Some(e.get().clone()),
- hash_map::Entry::Vacant(e) => {
- Self::start_language_server(self.client.clone(), language, &worktree_abs_path, cx)
- .map(|server| e.insert(server).clone())
+ // If the buffer has a language, set it and start/assign the language server
+ if let Some(language) = self.languages.select_language(&full_path) {
+ buffer.update(cx, |buffer, cx| {
+ buffer.set_language(Some(language.clone()), cx);
+ });
+
+ // For local worktrees, start a language server if needed.
+ // Also assign the language server and any previously stored diagnostics to the buffer.
+ if let Some(local_worktree) = worktree.read(cx).as_local() {
+ let worktree_id = local_worktree.id();
+ let worktree_abs_path = local_worktree.abs_path().clone();
+
+ let language_server = match self
+ .language_servers
+ .entry((worktree_id, language.name().to_string()))
+ {
+ hash_map::Entry::Occupied(e) => Some(e.get().clone()),
+ hash_map::Entry::Vacant(e) => Self::start_language_server(
+ self.client.clone(),
+ language.clone(),
+ &worktree_abs_path,
+ cx,
+ )
+ .map(|server| e.insert(server).clone()),
+ };
+
+ buffer.update(cx, |buffer, cx| {
+ buffer.set_language_server(language_server, cx);
+ });
}
- };
+ }
- buffer.update(cx, |buffer, cx| {
- buffer.set_language_server(language_server, cx)
- });
+ if let Some(local_worktree) = worktree.read(cx).as_local() {
+ if let Some(diagnostics) = local_worktree.diagnostics_for_path(&path) {
+ buffer.update(cx, |buffer, cx| {
+ buffer.update_diagnostics(None, diagnostics, cx).log_err();
+ });
+ }
+ }
None
}
@@ -671,30 +786,141 @@ impl Project {
Some(language_server)
}
- fn update_diagnostics(
+ pub fn update_diagnostics(
&mut self,
- diagnostics: lsp::PublishDiagnosticsParams,
+ params: lsp::PublishDiagnosticsParams,
disk_based_sources: &HashSet<String>,
cx: &mut ModelContext<Self>,
) -> Result<()> {
- let path = diagnostics
+ let abs_path = params
.uri
.to_file_path()
.map_err(|_| anyhow!("URI is not a file"))?;
+ let mut next_group_id = 0;
+ let mut diagnostics = Vec::default();
+ let mut primary_diagnostic_group_ids = HashMap::default();
+ let mut sources_by_group_id = HashMap::default();
+ let mut supporting_diagnostic_severities = HashMap::default();
+ for diagnostic in ¶ms.diagnostics {
+ let source = diagnostic.source.as_ref();
+ let code = diagnostic.code.as_ref().map(|code| match code {
+ lsp::NumberOrString::Number(code) => code.to_string(),
+ lsp::NumberOrString::String(code) => code.clone(),
+ });
+ let range = range_from_lsp(diagnostic.range);
+ let is_supporting = diagnostic
+ .related_information
+ .as_ref()
+ .map_or(false, |infos| {
+ infos.iter().any(|info| {
+ primary_diagnostic_group_ids.contains_key(&(
+ source,
+ code.clone(),
+ range_from_lsp(info.location.range),
+ ))
+ })
+ });
+
+ if is_supporting {
+ if let Some(severity) = diagnostic.severity {
+ supporting_diagnostic_severities
+ .insert((source, code.clone(), range), severity);
+ }
+ } else {
+ let group_id = post_inc(&mut next_group_id);
+ let is_disk_based =
+ source.map_or(false, |source| disk_based_sources.contains(source));
+
+ sources_by_group_id.insert(group_id, source);
+ primary_diagnostic_group_ids
+ .insert((source, code.clone(), range.clone()), group_id);
+
+ diagnostics.push(DiagnosticEntry {
+ range,
+ diagnostic: Diagnostic {
+ code: code.clone(),
+ severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
+ message: diagnostic.message.clone(),
+ group_id,
+ is_primary: true,
+ is_valid: true,
+ is_disk_based,
+ },
+ });
+ if let Some(infos) = &diagnostic.related_information {
+ for info in infos {
+ if info.location.uri == params.uri {
+ let range = range_from_lsp(info.location.range);
+ diagnostics.push(DiagnosticEntry {
+ range,
+ diagnostic: Diagnostic {
+ code: code.clone(),
+ severity: DiagnosticSeverity::INFORMATION,
+ message: info.message.clone(),
+ group_id,
+ is_primary: false,
+ is_valid: true,
+ is_disk_based,
+ },
+ });
+ }
+ }
+ }
+ }
+ }
+
+ for entry in &mut diagnostics {
+ let diagnostic = &mut entry.diagnostic;
+ if !diagnostic.is_primary {
+ let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
+ if let Some(&severity) = supporting_diagnostic_severities.get(&(
+ source,
+ diagnostic.code.clone(),
+ entry.range.clone(),
+ )) {
+ diagnostic.severity = severity;
+ }
+ }
+ }
+
+ self.update_diagnostic_entries(abs_path, params.version, diagnostics, cx)?;
+ Ok(())
+ }
+
+ pub fn update_diagnostic_entries(
+ &mut self,
+ abs_path: PathBuf,
+ version: Option<i32>,
+ diagnostics: Vec<DiagnosticEntry<PointUtf16>>,
+ cx: &mut ModelContext<Project>,
+ ) -> Result<(), anyhow::Error> {
let (worktree, relative_path) = self
- .find_worktree_for_abs_path(&path, cx)
+ .find_local_worktree(&abs_path, cx)
.ok_or_else(|| anyhow!("no worktree found for diagnostics"))?;
let project_path = ProjectPath {
worktree_id: worktree.read(cx).id(),
path: relative_path.into(),
};
+
+ for buffer in self.open_buffers.values() {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ if buffer
+ .read(cx)
+ .file()
+ .map_or(false, |file| *file.path() == project_path.path)
+ {
+ buffer.update(cx, |buffer, cx| {
+ buffer.update_diagnostics(version, diagnostics.clone(), cx)
+ })?;
+ break;
+ }
+ }
+ }
worktree.update(cx, |worktree, cx| {
- worktree.as_local_mut().unwrap().update_diagnostics(
- project_path.path.clone(),
- diagnostics,
- disk_based_sources,
- cx,
- )
+ worktree
+ .as_local_mut()
+ .ok_or_else(|| anyhow!("not a local worktree"))?
+ .update_diagnostics(project_path.path.clone(), diagnostics, cx)
})?;
cx.emit(Event::DiagnosticsUpdated(project_path));
Ok(())
@@ -774,15 +1000,14 @@ impl Project {
.to_file_path()
.map_err(|_| anyhow!("invalid target path"))?;
- let (worktree, relative_path) = if let Some(result) = this
- .read_with(&cx, |this, cx| {
- this.find_worktree_for_abs_path(&abs_path, cx)
- }) {
+ let (worktree, relative_path) = if let Some(result) =
+ this.read_with(&cx, |this, cx| this.find_local_worktree(&abs_path, cx))
+ {
result
} else {
- let (worktree, relative_path) = this
+ let worktree = this
.update(&mut cx, |this, cx| {
- this.create_worktree_for_abs_path(&abs_path, true, cx)
+ this.create_local_worktree(&abs_path, true, cx)
})
.await?;
this.update(&mut cx, |this, cx| {
@@ -791,7 +1016,7 @@ impl Project {
lang_server.clone(),
);
});
- (worktree, relative_path)
+ (worktree, PathBuf::new())
};
let project_path = ProjectPath {
@@ -824,34 +1049,23 @@ impl Project {
}
}
- pub fn find_or_create_worktree_for_abs_path(
+ pub fn find_or_create_local_worktree(
&self,
abs_path: impl AsRef<Path>,
weak: bool,
cx: &mut ModelContext<Self>,
) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
let abs_path = abs_path.as_ref();
- if let Some((tree, relative_path)) = self.find_worktree_for_abs_path(abs_path, cx) {
+ if let Some((tree, relative_path)) = self.find_local_worktree(abs_path, cx) {
Task::ready(Ok((tree.clone(), relative_path.into())))
} else {
- self.create_worktree_for_abs_path(abs_path, weak, cx)
+ let worktree = self.create_local_worktree(abs_path, weak, cx);
+ cx.foreground()
+ .spawn(async move { Ok((worktree.await?, PathBuf::new())) })
}
}
- fn create_worktree_for_abs_path(
- &self,
- abs_path: &Path,
- weak: bool,
- cx: &mut ModelContext<Self>,
- ) -> Task<Result<(ModelHandle<Worktree>, PathBuf)>> {
- let worktree = self.add_local_worktree(abs_path, weak, cx);
- cx.background().spawn(async move {
- let worktree = worktree.await?;
- Ok((worktree, PathBuf::new()))
- })
- }
-
- fn find_worktree_for_abs_path(
+ fn find_local_worktree(
&self,
abs_path: &Path,
cx: &AppContext,
@@ -875,7 +1089,7 @@ impl Project {
}
}
- fn add_local_worktree(
+ fn create_local_worktree(
&self,
abs_path: impl AsRef<Path>,
weak: bool,
@@ -883,11 +1097,9 @@ impl Project {
) -> Task<Result<ModelHandle<Worktree>>> {
let fs = self.fs.clone();
let client = self.client.clone();
- let user_store = self.user_store.clone();
let path = Arc::from(abs_path.as_ref());
cx.spawn(|project, mut cx| async move {
- let worktree =
- Worktree::open_local(client.clone(), user_store, path, weak, fs, &mut cx).await?;
+ let worktree = Worktree::local(client.clone(), path, weak, fs, &mut cx).await?;
let (remote_project_id, is_shared) = project.update(&mut cx, |project, cx| {
project.add_worktree(&worktree, cx);
@@ -903,7 +1115,7 @@ impl Project {
if is_shared {
worktree
.update(&mut cx, |worktree, cx| {
- worktree.as_local_mut().unwrap().share(cx)
+ worktree.as_local_mut().unwrap().share(project_id, cx)
})
.await?;
}
@@ -924,6 +1136,10 @@ impl Project {
fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
+ cx.subscribe(&worktree, |this, worktree, _, cx| {
+ this.update_open_buffers(worktree, cx)
+ })
+ .detach();
let push_weak_handle = {
let worktree = worktree.read(cx);
@@ -945,6 +1161,74 @@ impl Project {
cx.notify();
}
+ fn update_open_buffers(
+ &mut self,
+ worktree_handle: ModelHandle<Worktree>,
+ cx: &mut ModelContext<Self>,
+ ) {
+ let local = worktree_handle.read(cx).is_local();
+ let snapshot = worktree_handle.read(cx).snapshot();
+ let worktree_path = snapshot.abs_path();
+ let mut buffers_to_delete = Vec::new();
+ for (buffer_id, buffer) in &self.open_buffers {
+ if let OpenBuffer::Loaded(buffer) = buffer {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, cx| {
+ if let Some(old_file) = File::from_dyn(buffer.file()) {
+ if old_file.worktree != worktree_handle {
+ return;
+ }
+
+ let new_file = if let Some(entry) = old_file
+ .entry_id
+ .and_then(|entry_id| snapshot.entry_for_id(entry_id))
+ {
+ File {
+ is_local: local,
+ worktree_path: worktree_path.clone(),
+ entry_id: Some(entry.id),
+ mtime: entry.mtime,
+ path: entry.path.clone(),
+ worktree: worktree_handle.clone(),
+ }
+ } else if let Some(entry) =
+ snapshot.entry_for_path(old_file.path().as_ref())
+ {
+ File {
+ is_local: local,
+ worktree_path: worktree_path.clone(),
+ entry_id: Some(entry.id),
+ mtime: entry.mtime,
+ path: entry.path.clone(),
+ worktree: worktree_handle.clone(),
+ }
+ } else {
+ File {
+ is_local: local,
+ worktree_path: worktree_path.clone(),
+ entry_id: None,
+ path: old_file.path().clone(),
+ mtime: old_file.mtime(),
+ worktree: worktree_handle.clone(),
+ }
+ };
+
+ if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
+ task.detach();
+ }
+ }
+ });
+ } else {
+ buffers_to_delete.push(*buffer_id);
+ }
+ }
+ }
+
+ for buffer_id in buffers_to_delete {
+ self.open_buffers.remove(&buffer_id);
+ }
+ }
+
pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
let new_active_entry = entry.and_then(|project_path| {
let worktree = self.worktree_for_id(project_path.worktree_id, cx)?;
@@ -988,14 +1272,14 @@ impl Project {
})
}
- fn disk_based_diagnostics_started(&mut self, cx: &mut ModelContext<Self>) {
+ pub fn disk_based_diagnostics_started(&mut self, cx: &mut ModelContext<Self>) {
self.language_servers_with_diagnostics_running += 1;
if self.language_servers_with_diagnostics_running == 1 {
cx.emit(Event::DiskBasedDiagnosticsStarted);
}
}
- fn disk_based_diagnostics_finished(&mut self, cx: &mut ModelContext<Self>) {
+ pub fn disk_based_diagnostics_finished(&mut self, cx: &mut ModelContext<Self>) {
cx.emit(Event::DiskBasedDiagnosticsUpdated);
self.language_servers_with_diagnostics_running -= 1;
if self.language_servers_with_diagnostics_running == 0 {
@@ -1072,11 +1356,15 @@ impl Project {
.remove(&peer_id)
.ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))?
.replica_id;
- for worktree in self.worktrees(cx).collect::<Vec<_>>() {
- worktree.update(cx, |worktree, cx| {
- worktree.remove_collaborator(peer_id, replica_id, cx);
- })
+ self.shared_buffers.remove(&peer_id);
+ for (_, buffer) in &self.open_buffers {
+ if let OpenBuffer::Loaded(buffer) = buffer {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
+ }
+ }
}
+ cx.notify();
Ok(())
}
@@ -1092,12 +1380,10 @@ impl Project {
.payload
.worktree
.ok_or_else(|| anyhow!("invalid worktree"))?;
- let user_store = self.user_store.clone();
cx.spawn(|this, mut cx| {
async move {
let worktree =
- Worktree::remote(remote_id, replica_id, worktree, client, user_store, &mut cx)
- .await?;
+ Worktree::remote(remote_id, replica_id, worktree, client, &mut cx).await?;
this.update(&mut cx, |this, cx| this.add_worktree(&worktree, cx));
Ok(())
}
@@ -1185,11 +1471,27 @@ impl Project {
_: Arc<Client>,
cx: &mut ModelContext<Self>,
) -> Result<()> {
- let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
- if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
- worktree.update(cx, |worktree, cx| {
- worktree.handle_update_buffer(envelope, cx)
- })?;
+ let payload = envelope.payload.clone();
+ let buffer_id = payload.buffer_id as usize;
+ let ops = payload
+ .operations
+ .into_iter()
+ .map(|op| language::proto::deserialize_operation(op))
+ .collect::<Result<Vec<_>, _>>()?;
+ match self.open_buffers.get_mut(&buffer_id) {
+ Some(OpenBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
+ Some(OpenBuffer::Loaded(buffer)) => {
+ if let Some(buffer) = buffer.upgrade(cx) {
+ buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
+ } else {
+ self.open_buffers
+ .insert(buffer_id, OpenBuffer::Operations(ops));
+ }
+ }
+ None => {
+ self.open_buffers
+ .insert(buffer_id, OpenBuffer::Operations(ops));
+ }
}
Ok(())
}
@@ -1200,12 +1502,40 @@ impl Project {
rpc: Arc<Client>,
cx: &mut ModelContext<Self>,
) -> Result<()> {
- let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
- if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
- worktree.update(cx, |worktree, cx| {
- worktree.handle_save_buffer(envelope, rpc, cx)
- })?;
- }
+ let sender_id = envelope.original_sender_id()?;
+ let project_id = self.remote_id().ok_or_else(|| anyhow!("not connected"))?;
+ let buffer = self
+ .shared_buffers
+ .get(&sender_id)
+ .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+ .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
+ let receipt = envelope.receipt();
+ let buffer_id = envelope.payload.buffer_id;
+ let save = cx.spawn(|_, mut cx| async move {
+ buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await
+ });
+
+ cx.background()
+ .spawn(
+ async move {
+ let (version, mtime) = save.await?;
+
+ rpc.respond(
+ receipt,
+ proto::BufferSaved {
+ project_id,
+ buffer_id,
+ version: (&version).into(),
+ mtime: Some(mtime.into()),
+ },
+ )
+ .await?;
+
+ Ok(())
+ }
+ .log_err(),
+ )
+ .detach();
Ok(())
}
@@ -1215,12 +1545,36 @@ impl Project {
rpc: Arc<Client>,
cx: &mut ModelContext<Self>,
) -> Result<()> {
- let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
- if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
- worktree.update(cx, |worktree, cx| {
- worktree.handle_format_buffer(envelope, rpc, cx)
- })?;
- }
+ let receipt = envelope.receipt();
+ let sender_id = envelope.original_sender_id()?;
+ let buffer = self
+ .shared_buffers
+ .get(&sender_id)
+ .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
+ .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
+ cx.spawn(|_, mut cx| async move {
+ let format = buffer.update(&mut cx, |buffer, cx| buffer.format(cx)).await;
+ // We spawn here in order to enqueue the sending of `Ack` *after* transmission of edits
+ // associated with formatting.
+ cx.spawn(|_| async move {
+ match format {
+ Ok(()) => rpc.respond(receipt, proto::Ack {}).await?,
+ Err(error) => {
+ rpc.respond_with_error(
+ receipt,
+ proto::Error {
+ message: error.to_string(),
+ },
+ )
+ .await?
+ }
+ }
+ Ok::<_, anyhow::Error>(())
+ })
+ .await
+ .log_err();
+ })
+ .detach();
Ok(())
}
@@ -1233,28 +1587,30 @@ impl Project {
let receipt = envelope.receipt();
let peer_id = envelope.original_sender_id()?;
let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
- let worktree = self
- .worktree_for_id(worktree_id, cx)
- .ok_or_else(|| anyhow!("no such worktree"))?;
-
- let task = self.open_buffer(
+ let open_buffer = self.open_buffer(
ProjectPath {
worktree_id,
path: PathBuf::from(envelope.payload.path).into(),
},
cx,
);
- cx.spawn(|_, mut cx| {
+ cx.spawn(|this, mut cx| {
async move {
- let buffer = task.await?;
- let response = worktree.update(&mut cx, |worktree, cx| {
- worktree
- .as_local_mut()
- .unwrap()
- .open_remote_buffer(peer_id, buffer, cx)
+ let buffer = open_buffer.await?;
+ this.update(&mut cx, |this, _| {
+ this.shared_buffers
+ .entry(peer_id)
+ .or_default()
+ .insert(buffer.id() as u64, buffer.clone());
});
- rpc.respond(receipt, response).await?;
- Ok(())
+ let message = buffer.read_with(&cx, |buffer, _| buffer.to_proto());
+ rpc.respond(
+ receipt,
+ proto::OpenBufferResponse {
+ buffer: Some(message),
+ },
+ )
+ .await
}
.log_err()
})
@@ -1268,14 +1624,9 @@ impl Project {
_: Arc<Client>,
cx: &mut ModelContext<Self>,
) -> anyhow::Result<()> {
- let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
- if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
- worktree.update(cx, |worktree, cx| {
- worktree
- .as_local_mut()
- .unwrap()
- .close_remote_buffer(envelope, cx)
- })?;
+ if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
+ shared_buffers.remove(&envelope.payload.buffer_id);
+ cx.notify();
}
Ok(())
}
@@ -1286,10 +1637,26 @@ impl Project {
_: Arc<Client>,
cx: &mut ModelContext<Self>,
) -> Result<()> {
- let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id);
- if let Some(worktree) = self.worktree_for_id(worktree_id, cx) {
- worktree.update(cx, |worktree, cx| {
- worktree.handle_buffer_saved(envelope, cx)
+ let payload = envelope.payload.clone();
+ let buffer = self
+ .open_buffers
+ .get(&(payload.buffer_id as usize))
+ .and_then(|buf| {
+ if let OpenBuffer::Loaded(buffer) = buf {
+ buffer.upgrade(cx)
+ } else {
+ None
+ }
+ });
+ if let Some(buffer) = buffer {
+ buffer.update(cx, |buffer, cx| {
+ let version = payload.version.try_into()?;
+ let mtime = payload
+ .mtime
+ .ok_or_else(|| anyhow!("missing mtime"))?
+ .into();
+ buffer.did_save(version, mtime, None, cx);
+ Result::<_, anyhow::Error>::Ok(())
})?;
}
Ok(())
@@ -1470,6 +1837,24 @@ impl Collaborator {
}
}
+impl<P: AsRef<Path>> From<(WorktreeId, P)> for ProjectPath {
+ fn from((worktree_id, path): (WorktreeId, P)) -> Self {
+ Self {
+ worktree_id,
+ path: path.as_ref().into(),
+ }
+ }
+}
+
+impl OpenBuffer {
+ fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
+ match self {
+ OpenBuffer::Loaded(buffer) => buffer.upgrade(cx),
+ OpenBuffer::Operations(_) => None,
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::{Event, *};
@@ -1483,8 +1868,10 @@ mod tests {
};
use lsp::Url;
use serde_json::json;
- use std::{os::unix, path::PathBuf};
+ use std::{cell::RefCell, os::unix, path::PathBuf, rc::Rc};
+ use unindent::Unindent as _;
use util::test::temp_tree;
+ use worktree::WorktreeHandle as _;
#[gpui::test]
async fn test_populate_and_search(mut cx: gpui::TestAppContext) {
@@ -1511,11 +1898,11 @@ mod tests {
)
.unwrap();
- let project = build_project(&mut cx);
+ let project = build_project(Arc::new(RealFs), &mut cx);
let (tree, _) = project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(&root_link_path, false, cx)
+ project.find_or_create_local_worktree(&root_link_path, false, cx)
})
.await
.unwrap();
@@ -1590,7 +1977,7 @@ mod tests {
let (tree, _) = project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(dir.path(), false, cx)
+ project.find_or_create_local_worktree(dir.path(), false, cx)
})
.await
.unwrap();
@@ -1656,8 +2043,8 @@ mod tests {
Event::DiskBasedDiagnosticsFinished
);
- let (buffer, _) = tree
- .update(&mut cx, |tree, cx| tree.open_buffer("a.rs", cx))
+ let buffer = project
+ .update(&mut cx, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
.await
.unwrap();
@@ -1693,10 +2080,10 @@ mod tests {
}
}));
- let project = build_project(&mut cx);
+ let project = build_project(Arc::new(RealFs), &mut cx);
let (tree, _) = project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(&dir.path(), false, cx)
+ project.find_or_create_local_worktree(&dir.path(), false, cx)
})
.await
.unwrap();
@@ -1750,7 +2137,7 @@ mod tests {
let (tree, _) = project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(dir.path().join("b.rs"), false, cx)
+ project.find_or_create_local_worktree(dir.path().join("b.rs"), false, cx)
})
.await
.unwrap();
@@ -5,19 +5,16 @@ use super::{
};
use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
use anyhow::{anyhow, Result};
-use client::{proto, Client, PeerId, TypedEnvelope, UserStore};
+use client::{proto, Client, TypedEnvelope};
use clock::ReplicaId;
-use collections::{hash_map, HashMap, HashSet};
+use collections::HashMap;
use futures::{Stream, StreamExt};
use fuzzy::CharBag;
use gpui::{
executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext,
- Task, UpgradeModelHandle, WeakModelHandle,
-};
-use language::{
- range_from_lsp, Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, File as _, Operation,
- PointUtf16, Rope,
+ Task,
};
+use language::{Buffer, DiagnosticEntry, Operation, PointUtf16, Rope};
use lazy_static::lazy_static;
use parking_lot::Mutex;
use postage::{
@@ -36,19 +33,69 @@ use std::{
ops::Deref,
path::{Path, PathBuf},
sync::{
- atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
+ atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
},
time::{Duration, SystemTime},
};
-use sum_tree::{Bias, TreeMap};
-use sum_tree::{Edit, SeekTarget, SumTree};
-use util::{post_inc, ResultExt, TryFutureExt};
+use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap};
+use util::ResultExt;
lazy_static! {
static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
}
+#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
+pub struct WorktreeId(usize);
+
+pub enum Worktree {
+ Local(LocalWorktree),
+ Remote(RemoteWorktree),
+}
+
+pub struct LocalWorktree {
+ snapshot: Snapshot,
+ config: WorktreeConfig,
+ background_snapshot: Arc<Mutex<Snapshot>>,
+ last_scan_state_rx: watch::Receiver<ScanState>,
+ _background_scanner_task: Option<Task<()>>,
+ poll_task: Option<Task<()>>,
+ registration: Registration,
+ share: Option<ShareState>,
+ diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<PointUtf16>>>,
+ diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
+ queued_operations: Vec<(u64, Operation)>,
+ client: Arc<Client>,
+ fs: Arc<dyn Fs>,
+ weak: bool,
+}
+
+pub struct RemoteWorktree {
+ pub(crate) snapshot: Snapshot,
+ project_id: u64,
+ snapshot_rx: watch::Receiver<Snapshot>,
+ client: Arc<Client>,
+ updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
+ replica_id: ReplicaId,
+ queued_operations: Vec<(u64, Operation)>,
+ diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
+ weak: bool,
+}
+
+#[derive(Clone)]
+pub struct Snapshot {
+ id: WorktreeId,
+ scan_id: usize,
+ abs_path: Arc<Path>,
+ root_name: String,
+ root_char_bag: CharBag,
+ ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
+ entries_by_path: SumTree<Entry>,
+ entries_by_id: SumTree<PathEntry>,
+ removed_entry_ids: HashMap<u64, usize>,
+ next_entry_id: Arc<AtomicUsize>,
+}
+
#[derive(Clone, Debug)]
enum ScanState {
Idle,
@@ -56,16 +103,30 @@ enum ScanState {
Err(Arc<anyhow::Error>),
}
-#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
-pub struct WorktreeId(usize);
+#[derive(Debug, Eq, PartialEq)]
+enum Registration {
+ None,
+ Pending,
+ Done { project_id: u64 },
+}
-pub enum Worktree {
- Local(LocalWorktree),
- Remote(RemoteWorktree),
+struct ShareState {
+ project_id: u64,
+ snapshots_tx: Sender<Snapshot>,
+ _maintain_remote_snapshot: Option<Task<()>>,
+}
+
+#[derive(Default, Deserialize)]
+struct WorktreeConfig {
+ collaborators: Vec<String>,
+}
+
+pub enum Event {
+ UpdatedEntries,
}
impl Entity for Worktree {
- type Event = ();
+ type Event = Event;
fn release(&mut self, cx: &mut MutableAppContext) {
if let Some(worktree) = self.as_local_mut() {
@@ -87,16 +148,14 @@ impl Entity for Worktree {
}
impl Worktree {
- pub async fn open_local(
+ pub async fn local(
client: Arc<Client>,
- user_store: ModelHandle<UserStore>,
path: impl Into<Arc<Path>>,
weak: bool,
fs: Arc<dyn Fs>,
cx: &mut AsyncAppContext,
) -> Result<ModelHandle<Self>> {
- let (tree, scan_states_tx) =
- LocalWorktree::new(client, user_store, path, weak, fs.clone(), cx).await?;
+ let (tree, scan_states_tx) = LocalWorktree::new(client, path, weak, fs.clone(), cx).await?;
tree.update(cx, |tree, cx| {
let tree = tree.as_local_mut().unwrap();
let abs_path = tree.snapshot.abs_path.clone();
@@ -117,7 +176,6 @@ impl Worktree {
replica_id: ReplicaId,
worktree: proto::Worktree,
client: Arc<Client>,
- user_store: ModelHandle<UserStore>,
cx: &mut AsyncAppContext,
) -> Result<ModelHandle<Self>> {
let remote_id = worktree.id;
@@ -222,10 +280,7 @@ impl Worktree {
snapshot_rx,
updates_tx,
client: client.clone(),
- loading_buffers: Default::default(),
- open_buffers: Default::default(),
queued_operations: Default::default(),
- user_store,
diagnostic_summaries,
weak,
})
@@ -292,22 +347,14 @@ impl Worktree {
}
}
- pub fn remove_collaborator(
+ pub fn load_buffer(
&mut self,
- peer_id: PeerId,
- replica_id: ReplicaId,
+ path: &Path,
cx: &mut ModelContext<Self>,
- ) {
- match self {
- Worktree::Local(worktree) => worktree.remove_collaborator(peer_id, replica_id, cx),
- Worktree::Remote(worktree) => worktree.remove_collaborator(replica_id, cx),
- }
- }
-
- pub fn user_store(&self) -> &ModelHandle<UserStore> {
+ ) -> Task<Result<ModelHandle<Buffer>>> {
match self {
- Worktree::Local(worktree) => &worktree.user_store,
- Worktree::Remote(worktree) => &worktree.user_store,
+ Worktree::Local(worktree) => worktree.load_buffer(path, cx),
+ Worktree::Remote(worktree) => worktree.load_buffer(path, cx),
}
}
@@ -322,267 +369,6 @@ impl Worktree {
.map(|(path, summary)| (path.0.clone(), summary.clone()))
}
- pub fn loading_buffers<'a>(&'a mut self) -> &'a mut LoadingBuffers {
- match self {
- Worktree::Local(worktree) => &mut worktree.loading_buffers,
- Worktree::Remote(worktree) => &mut worktree.loading_buffers,
- }
- }
-
- pub fn open_buffer(
- &mut self,
- path: impl AsRef<Path>,
- cx: &mut ModelContext<Self>,
- ) -> Task<Result<(ModelHandle<Buffer>, bool)>> {
- let path = path.as_ref();
-
- // If there is already a buffer for the given path, then return it.
- let existing_buffer = match self {
- Worktree::Local(worktree) => worktree.get_open_buffer(path, cx),
- Worktree::Remote(worktree) => worktree.get_open_buffer(path, cx),
- };
- if let Some(existing_buffer) = existing_buffer {
- return cx.spawn(move |_, _| async move { Ok((existing_buffer, false)) });
- }
-
- let is_new = Arc::new(AtomicBool::new(true));
- let path: Arc<Path> = Arc::from(path);
- let mut loading_watch = match self.loading_buffers().entry(path.clone()) {
- // If the given path is already being loaded, then wait for that existing
- // task to complete and return the same buffer.
- hash_map::Entry::Occupied(e) => e.get().clone(),
-
- // Otherwise, record the fact that this path is now being loaded.
- hash_map::Entry::Vacant(entry) => {
- let (mut tx, rx) = postage::watch::channel();
- entry.insert(rx.clone());
-
- let load_buffer = match self {
- Worktree::Local(worktree) => worktree.open_buffer(&path, cx),
- Worktree::Remote(worktree) => worktree.open_buffer(&path, cx),
- };
- cx.spawn(move |this, mut cx| async move {
- let result = load_buffer.await;
-
- // After the buffer loads, record the fact that it is no longer
- // loading.
- this.update(&mut cx, |this, _| this.loading_buffers().remove(&path));
- *tx.borrow_mut() = Some(match result {
- Ok(buffer) => Ok((buffer, is_new)),
- Err(error) => Err(Arc::new(error)),
- });
- })
- .detach();
- rx
- }
- };
-
- cx.spawn(|_, _| async move {
- loop {
- if let Some(result) = loading_watch.borrow().as_ref() {
- return match result {
- Ok((buf, is_new)) => Ok((buf.clone(), is_new.fetch_and(false, SeqCst))),
- Err(error) => Err(anyhow!("{}", error)),
- };
- }
- loading_watch.recv().await;
- }
- })
- }
-
- #[cfg(feature = "test-support")]
- pub fn has_open_buffer(&self, path: impl AsRef<Path>, cx: &AppContext) -> bool {
- let mut open_buffers: Box<dyn Iterator<Item = _>> = match self {
- Worktree::Local(worktree) => Box::new(worktree.open_buffers.values()),
- Worktree::Remote(worktree) => {
- Box::new(worktree.open_buffers.values().filter_map(|buf| {
- if let RemoteBuffer::Loaded(buf) = buf {
- Some(buf)
- } else {
- None
- }
- }))
- }
- };
-
- let path = path.as_ref();
- open_buffers
- .find(|buffer| {
- if let Some(file) = buffer.upgrade(cx).and_then(|buffer| buffer.read(cx).file()) {
- file.path().as_ref() == path
- } else {
- false
- }
- })
- .is_some()
- }
-
- pub fn handle_update_buffer(
- &mut self,
- envelope: TypedEnvelope<proto::UpdateBuffer>,
- cx: &mut ModelContext<Self>,
- ) -> Result<()> {
- let payload = envelope.payload.clone();
- let buffer_id = payload.buffer_id as usize;
- let ops = payload
- .operations
- .into_iter()
- .map(|op| language::proto::deserialize_operation(op))
- .collect::<Result<Vec<_>, _>>()?;
-
- match self {
- Worktree::Local(worktree) => {
- let buffer = worktree
- .open_buffers
- .get(&buffer_id)
- .and_then(|buf| buf.upgrade(cx))
- .ok_or_else(|| {
- anyhow!("invalid buffer {} in update buffer message", buffer_id)
- })?;
- buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
- }
- Worktree::Remote(worktree) => match worktree.open_buffers.get_mut(&buffer_id) {
- Some(RemoteBuffer::Operations(pending_ops)) => pending_ops.extend(ops),
- Some(RemoteBuffer::Loaded(buffer)) => {
- if let Some(buffer) = buffer.upgrade(cx) {
- buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
- } else {
- worktree
- .open_buffers
- .insert(buffer_id, RemoteBuffer::Operations(ops));
- }
- }
- None => {
- worktree
- .open_buffers
- .insert(buffer_id, RemoteBuffer::Operations(ops));
- }
- },
- }
-
- Ok(())
- }
-
- pub fn handle_save_buffer(
- &mut self,
- envelope: TypedEnvelope<proto::SaveBuffer>,
- rpc: Arc<Client>,
- cx: &mut ModelContext<Self>,
- ) -> Result<()> {
- let sender_id = envelope.original_sender_id()?;
- let this = self.as_local().unwrap();
- let project_id = this
- .share
- .as_ref()
- .ok_or_else(|| anyhow!("can't save buffer while disconnected"))?
- .project_id;
-
- let buffer = this
- .shared_buffers
- .get(&sender_id)
- .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
- .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
-
- let receipt = envelope.receipt();
- let worktree_id = envelope.payload.worktree_id;
- let buffer_id = envelope.payload.buffer_id;
- let save = cx.spawn(|_, mut cx| async move {
- buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await
- });
-
- cx.background()
- .spawn(
- async move {
- let (version, mtime) = save.await?;
-
- rpc.respond(
- receipt,
- proto::BufferSaved {
- project_id,
- worktree_id,
- buffer_id,
- version: (&version).into(),
- mtime: Some(mtime.into()),
- },
- )
- .await?;
-
- Ok(())
- }
- .log_err(),
- )
- .detach();
-
- Ok(())
- }
-
- pub fn handle_buffer_saved(
- &mut self,
- envelope: TypedEnvelope<proto::BufferSaved>,
- cx: &mut ModelContext<Self>,
- ) -> Result<()> {
- let payload = envelope.payload.clone();
- let worktree = self.as_remote_mut().unwrap();
- if let Some(buffer) = worktree
- .open_buffers
- .get(&(payload.buffer_id as usize))
- .and_then(|buf| buf.upgrade(cx))
- {
- buffer.update(cx, |buffer, cx| {
- let version = payload.version.try_into()?;
- let mtime = payload
- .mtime
- .ok_or_else(|| anyhow!("missing mtime"))?
- .into();
- buffer.did_save(version, mtime, None, cx);
- Result::<_, anyhow::Error>::Ok(())
- })?;
- }
- Ok(())
- }
-
- pub fn handle_format_buffer(
- &mut self,
- envelope: TypedEnvelope<proto::FormatBuffer>,
- rpc: Arc<Client>,
- cx: &mut ModelContext<Self>,
- ) -> Result<()> {
- let sender_id = envelope.original_sender_id()?;
- let this = self.as_local().unwrap();
- let buffer = this
- .shared_buffers
- .get(&sender_id)
- .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
- .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
-
- let receipt = envelope.receipt();
- cx.spawn(|_, mut cx| async move {
- let format = buffer.update(&mut cx, |buffer, cx| buffer.format(cx)).await;
- // We spawn here in order to enqueue the sending of `Ack` *after* transmission of edits
- // associated with formatting.
- cx.spawn(|_| async move {
- match format {
- Ok(()) => rpc.respond(receipt, proto::Ack {}).await?,
- Err(error) => {
- rpc.respond_with_error(
- receipt,
- proto::Error {
- message: error.to_string(),
- },
- )
- .await?
- }
- }
- Ok::<_, anyhow::Error>(())
- })
- .await
- .log_err();
- })
- .detach();
-
- Ok(())
- }
-
fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
match self {
Self::Local(worktree) => {
@@ -604,116 +390,35 @@ impl Worktree {
}
} else {
worktree.poll_task.take();
- self.update_open_buffers(cx);
+ cx.emit(Event::UpdatedEntries);
}
}
Self::Remote(worktree) => {
worktree.snapshot = worktree.snapshot_rx.borrow().clone();
- self.update_open_buffers(cx);
+ cx.emit(Event::UpdatedEntries);
}
};
cx.notify();
}
- fn update_open_buffers(&mut self, cx: &mut ModelContext<Self>) {
- let open_buffers: Box<dyn Iterator<Item = _>> = match &self {
- Self::Local(worktree) => Box::new(worktree.open_buffers.iter()),
- Self::Remote(worktree) => {
- Box::new(worktree.open_buffers.iter().filter_map(|(id, buf)| {
- if let RemoteBuffer::Loaded(buf) = buf {
- Some((id, buf))
- } else {
- None
- }
- }))
- }
- };
-
- let local = self.as_local().is_some();
- let worktree_path = self.abs_path.clone();
- let worktree_handle = cx.handle();
- let mut buffers_to_delete = Vec::new();
- for (buffer_id, buffer) in open_buffers {
- if let Some(buffer) = buffer.upgrade(cx) {
- buffer.update(cx, |buffer, cx| {
- if let Some(old_file) = File::from_dyn(buffer.file()) {
- let new_file = if let Some(entry) = old_file
- .entry_id
- .and_then(|entry_id| self.entry_for_id(entry_id))
- {
- File {
- is_local: local,
- worktree_path: worktree_path.clone(),
- entry_id: Some(entry.id),
- mtime: entry.mtime,
- path: entry.path.clone(),
- worktree: worktree_handle.clone(),
- }
- } else if let Some(entry) = self.entry_for_path(old_file.path().as_ref()) {
- File {
- is_local: local,
- worktree_path: worktree_path.clone(),
- entry_id: Some(entry.id),
- mtime: entry.mtime,
- path: entry.path.clone(),
- worktree: worktree_handle.clone(),
- }
- } else {
- File {
- is_local: local,
- worktree_path: worktree_path.clone(),
- entry_id: None,
- path: old_file.path().clone(),
- mtime: old_file.mtime(),
- worktree: worktree_handle.clone(),
- }
- };
-
- if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
- task.detach();
- }
- }
- });
- } else {
- buffers_to_delete.push(*buffer_id);
- }
- }
-
- for buffer_id in buffers_to_delete {
- match self {
- Self::Local(worktree) => {
- worktree.open_buffers.remove(&buffer_id);
- }
- Self::Remote(worktree) => {
- worktree.open_buffers.remove(&buffer_id);
- }
- }
- }
- }
-
fn send_buffer_update(
&mut self,
buffer_id: u64,
operation: Operation,
cx: &mut ModelContext<Self>,
) {
- if let Some((project_id, worktree_id, rpc)) = match self {
+ if let Some((project_id, rpc)) = match self {
Worktree::Local(worktree) => worktree
.share
.as_ref()
- .map(|share| (share.project_id, worktree.id(), worktree.client.clone())),
- Worktree::Remote(worktree) => Some((
- worktree.project_id,
- worktree.snapshot.id(),
- worktree.client.clone(),
- )),
+ .map(|share| (share.project_id, worktree.client.clone())),
+ Worktree::Remote(worktree) => Some((worktree.project_id, worktree.client.clone())),
} {
cx.spawn(|worktree, mut cx| async move {
if let Err(error) = rpc
.request(proto::UpdateBuffer {
project_id,
- worktree_id: worktree_id.0 as u64,
buffer_id,
operations: vec![language::proto::serialize_operation(&operation)],
})
@@ -734,132 +439,32 @@ impl Worktree {
}
}
-impl WorktreeId {
- pub fn from_usize(handle_id: usize) -> Self {
- Self(handle_id)
- }
-
- pub(crate) fn from_proto(id: u64) -> Self {
- Self(id as usize)
- }
-
- pub fn to_proto(&self) -> u64 {
- self.0 as u64
- }
+impl LocalWorktree {
+ async fn new(
+ client: Arc<Client>,
+ path: impl Into<Arc<Path>>,
+ weak: bool,
+ fs: Arc<dyn Fs>,
+ cx: &mut AsyncAppContext,
+ ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
+ let abs_path = path.into();
+ let path: Arc<Path> = Arc::from(Path::new(""));
+ let next_entry_id = AtomicUsize::new(0);
- pub fn to_usize(&self) -> usize {
- self.0
- }
-}
+ // After determining whether the root entry is a file or a directory, populate the
+ // snapshot's "root name", which will be used for the purpose of fuzzy matching.
+ let root_name = abs_path
+ .file_name()
+ .map_or(String::new(), |f| f.to_string_lossy().to_string());
+ let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
+ let metadata = fs.metadata(&abs_path).await?;
-impl fmt::Display for WorktreeId {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- self.0.fmt(f)
- }
-}
-
-#[derive(Clone)]
-pub struct Snapshot {
- id: WorktreeId,
- scan_id: usize,
- abs_path: Arc<Path>,
- root_name: String,
- root_char_bag: CharBag,
- ignores: HashMap<Arc<Path>, (Arc<Gitignore>, usize)>,
- entries_by_path: SumTree<Entry>,
- entries_by_id: SumTree<PathEntry>,
- removed_entry_ids: HashMap<u64, usize>,
- next_entry_id: Arc<AtomicUsize>,
-}
-
-pub struct LocalWorktree {
- snapshot: Snapshot,
- config: WorktreeConfig,
- background_snapshot: Arc<Mutex<Snapshot>>,
- last_scan_state_rx: watch::Receiver<ScanState>,
- _background_scanner_task: Option<Task<()>>,
- poll_task: Option<Task<()>>,
- registration: Registration,
- share: Option<ShareState>,
- loading_buffers: LoadingBuffers,
- open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
- shared_buffers: HashMap<PeerId, HashMap<u64, ModelHandle<Buffer>>>,
- diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<PointUtf16>>>,
- diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
- queued_operations: Vec<(u64, Operation)>,
- client: Arc<Client>,
- user_store: ModelHandle<UserStore>,
- fs: Arc<dyn Fs>,
- weak: bool,
-}
-
-#[derive(Debug, Eq, PartialEq)]
-enum Registration {
- None,
- Pending,
- Done { project_id: u64 },
-}
-
-struct ShareState {
- project_id: u64,
- snapshots_tx: Sender<Snapshot>,
- _maintain_remote_snapshot: Option<Task<()>>,
-}
-
-pub struct RemoteWorktree {
- project_id: u64,
- snapshot: Snapshot,
- snapshot_rx: watch::Receiver<Snapshot>,
- client: Arc<Client>,
- updates_tx: postage::mpsc::Sender<proto::UpdateWorktree>,
- replica_id: ReplicaId,
- loading_buffers: LoadingBuffers,
- open_buffers: HashMap<usize, RemoteBuffer>,
- user_store: ModelHandle<UserStore>,
- queued_operations: Vec<(u64, Operation)>,
- diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
- weak: bool,
-}
-
-type LoadingBuffers = HashMap<
- Arc<Path>,
- postage::watch::Receiver<
- Option<Result<(ModelHandle<Buffer>, Arc<AtomicBool>), Arc<anyhow::Error>>>,
- >,
->;
-
-#[derive(Default, Deserialize)]
-struct WorktreeConfig {
- collaborators: Vec<String>,
-}
-
-impl LocalWorktree {
- async fn new(
- client: Arc<Client>,
- user_store: ModelHandle<UserStore>,
- path: impl Into<Arc<Path>>,
- weak: bool,
- fs: Arc<dyn Fs>,
- cx: &mut AsyncAppContext,
- ) -> Result<(ModelHandle<Worktree>, Sender<ScanState>)> {
- let abs_path = path.into();
- let path: Arc<Path> = Arc::from(Path::new(""));
- let next_entry_id = AtomicUsize::new(0);
-
- // After determining whether the root entry is a file or a directory, populate the
- // snapshot's "root name", which will be used for the purpose of fuzzy matching.
- let root_name = abs_path
- .file_name()
- .map_or(String::new(), |f| f.to_string_lossy().to_string());
- let root_char_bag = root_name.chars().map(|c| c.to_ascii_lowercase()).collect();
- let metadata = fs.metadata(&abs_path).await?;
-
- let mut config = WorktreeConfig::default();
- if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
- if let Ok(parsed) = toml::from_str(&zed_toml) {
- config = parsed;
- }
- }
+ let mut config = WorktreeConfig::default();
+ if let Ok(zed_toml) = fs.load(&abs_path.join(".zed.toml")).await {
+ if let Ok(parsed) = toml::from_str(&zed_toml) {
+ config = parsed;
+ }
+ }
let (scan_states_tx, scan_states_rx) = smol::channel::unbounded();
let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
@@ -897,14 +502,10 @@ impl LocalWorktree {
registration: Registration::None,
share: None,
poll_task: None,
- loading_buffers: Default::default(),
- open_buffers: Default::default(),
- shared_buffers: Default::default(),
diagnostics: Default::default(),
diagnostic_summaries: Default::default(),
queued_operations: Default::default(),
client,
- user_store,
fs,
weak,
};
@@ -946,29 +547,7 @@ impl LocalWorktree {
self.config.collaborators.clone()
}
- fn get_open_buffer(
- &mut self,
- path: &Path,
- cx: &mut ModelContext<Worktree>,
- ) -> Option<ModelHandle<Buffer>> {
- let handle = cx.handle();
- let mut result = None;
- self.open_buffers.retain(|_buffer_id, buffer| {
- if let Some(buffer) = buffer.upgrade(cx) {
- if let Some(file) = File::from_dyn(buffer.read(cx).file()) {
- if file.worktree == handle && file.path().as_ref() == path {
- result = Some(buffer);
- }
- }
- true
- } else {
- false
- }
- });
- result
- }
-
- fn open_buffer(
+ pub(crate) fn load_buffer(
&mut self,
path: &Path,
cx: &mut ModelContext<Worktree>,
@@ -978,200 +557,20 @@ impl LocalWorktree {
let (file, contents) = this
.update(&mut cx, |t, cx| t.as_local().unwrap().load(&path, cx))
.await?;
-
- let diagnostics = this.update(&mut cx, |this, _| {
- this.as_local_mut().unwrap().diagnostics.get(&path).cloned()
- });
-
- let mut buffer_operations = Vec::new();
- let buffer = cx.add_model(|cx| {
- let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx);
- if let Some(diagnostics) = diagnostics {
- let op = buffer.update_diagnostics(None, diagnostics, cx).unwrap();
- buffer_operations.push(op);
- }
- buffer
- });
-
- this.update(&mut cx, |this, cx| {
- for op in buffer_operations {
- this.send_buffer_update(buffer.read(cx).remote_id(), op, cx);
- }
- let this = this.as_local_mut().unwrap();
- this.open_buffers.insert(buffer.id(), buffer.downgrade());
- });
-
- Ok(buffer)
+ Ok(cx.add_model(|cx| Buffer::from_file(0, contents, Box::new(file), cx)))
})
}
- pub fn open_remote_buffer(
- &mut self,
- peer_id: PeerId,
- buffer: ModelHandle<Buffer>,
- cx: &mut ModelContext<Worktree>,
- ) -> proto::OpenBufferResponse {
- self.shared_buffers
- .entry(peer_id)
- .or_default()
- .insert(buffer.id() as u64, buffer.clone());
- proto::OpenBufferResponse {
- buffer: Some(buffer.update(cx.as_mut(), |buffer, _| buffer.to_proto())),
- }
- }
-
- pub fn close_remote_buffer(
- &mut self,
- envelope: TypedEnvelope<proto::CloseBuffer>,
- cx: &mut ModelContext<Worktree>,
- ) -> Result<()> {
- if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) {
- shared_buffers.remove(&envelope.payload.buffer_id);
- cx.notify();
- }
-
- Ok(())
- }
-
- pub fn remove_collaborator(
- &mut self,
- peer_id: PeerId,
- replica_id: ReplicaId,
- cx: &mut ModelContext<Worktree>,
- ) {
- self.shared_buffers.remove(&peer_id);
- for (_, buffer) in &self.open_buffers {
- if let Some(buffer) = buffer.upgrade(cx) {
- buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx));
- }
- }
- cx.notify();
+ pub fn diagnostics_for_path(&self, path: &Path) -> Option<Vec<DiagnosticEntry<PointUtf16>>> {
+ self.diagnostics.get(path).cloned()
}
pub fn update_diagnostics(
&mut self,
worktree_path: Arc<Path>,
- params: lsp::PublishDiagnosticsParams,
- disk_based_sources: &HashSet<String>,
- cx: &mut ModelContext<Worktree>,
- ) -> Result<()> {
- let mut next_group_id = 0;
- let mut diagnostics = Vec::default();
- let mut primary_diagnostic_group_ids = HashMap::default();
- let mut sources_by_group_id = HashMap::default();
- let mut supporting_diagnostic_severities = HashMap::default();
- for diagnostic in ¶ms.diagnostics {
- let source = diagnostic.source.as_ref();
- let code = diagnostic.code.as_ref().map(|code| match code {
- lsp::NumberOrString::Number(code) => code.to_string(),
- lsp::NumberOrString::String(code) => code.clone(),
- });
- let range = range_from_lsp(diagnostic.range);
- let is_supporting = diagnostic
- .related_information
- .as_ref()
- .map_or(false, |infos| {
- infos.iter().any(|info| {
- primary_diagnostic_group_ids.contains_key(&(
- source,
- code.clone(),
- range_from_lsp(info.location.range),
- ))
- })
- });
-
- if is_supporting {
- if let Some(severity) = diagnostic.severity {
- supporting_diagnostic_severities
- .insert((source, code.clone(), range), severity);
- }
- } else {
- let group_id = post_inc(&mut next_group_id);
- let is_disk_based =
- source.map_or(false, |source| disk_based_sources.contains(source));
-
- sources_by_group_id.insert(group_id, source);
- primary_diagnostic_group_ids
- .insert((source, code.clone(), range.clone()), group_id);
-
- diagnostics.push(DiagnosticEntry {
- range,
- diagnostic: Diagnostic {
- code: code.clone(),
- severity: diagnostic.severity.unwrap_or(DiagnosticSeverity::ERROR),
- message: diagnostic.message.clone(),
- group_id,
- is_primary: true,
- is_valid: true,
- is_disk_based,
- },
- });
- if let Some(infos) = &diagnostic.related_information {
- for info in infos {
- if info.location.uri == params.uri {
- let range = range_from_lsp(info.location.range);
- diagnostics.push(DiagnosticEntry {
- range,
- diagnostic: Diagnostic {
- code: code.clone(),
- severity: DiagnosticSeverity::INFORMATION,
- message: info.message.clone(),
- group_id,
- is_primary: false,
- is_valid: true,
- is_disk_based,
- },
- });
- }
- }
- }
- }
- }
-
- for entry in &mut diagnostics {
- let diagnostic = &mut entry.diagnostic;
- if !diagnostic.is_primary {
- let source = *sources_by_group_id.get(&diagnostic.group_id).unwrap();
- if let Some(&severity) = supporting_diagnostic_severities.get(&(
- source,
- diagnostic.code.clone(),
- entry.range.clone(),
- )) {
- diagnostic.severity = severity;
- }
- }
- }
-
- self.update_diagnostic_entries(worktree_path, params.version, diagnostics, cx)?;
- Ok(())
- }
-
- pub fn update_diagnostic_entries(
- &mut self,
- worktree_path: Arc<Path>,
- version: Option<i32>,
diagnostics: Vec<DiagnosticEntry<PointUtf16>>,
cx: &mut ModelContext<Worktree>,
) -> Result<()> {
- for buffer in self.open_buffers.values() {
- if let Some(buffer) = buffer.upgrade(cx) {
- if buffer
- .read(cx)
- .file()
- .map_or(false, |file| *file.path() == worktree_path)
- {
- let (remote_id, operation) = buffer.update(cx, |buffer, cx| {
- (
- buffer.remote_id(),
- buffer.update_diagnostics(version, diagnostics.clone(), cx),
- )
- });
- self.send_buffer_update(remote_id, operation?, cx);
- break;
- }
- }
- }
-
let summary = DiagnosticSummary::new(&diagnostics);
self.diagnostic_summaries
.insert(PathKey(worktree_path.clone()), summary.clone());
@@ -1207,40 +606,6 @@ impl LocalWorktree {
Ok(())
}
- fn send_buffer_update(
- &mut self,
- buffer_id: u64,
- operation: Operation,
- cx: &mut ModelContext<Worktree>,
- ) -> Option<()> {
- let share = self.share.as_ref()?;
- let project_id = share.project_id;
- let worktree_id = self.id();
- let rpc = self.client.clone();
- cx.spawn(|worktree, mut cx| async move {
- if let Err(error) = rpc
- .request(proto::UpdateBuffer {
- project_id,
- worktree_id: worktree_id.0 as u64,
- buffer_id,
- operations: vec![language::proto::serialize_operation(&operation)],
- })
- .await
- {
- worktree.update(&mut cx, |worktree, _| {
- log::error!("error sending buffer operation: {}", error);
- worktree
- .as_local_mut()
- .unwrap()
- .queued_operations
- .push((buffer_id, operation));
- });
- }
- })
- .detach();
- None
- }
-
pub fn scan_complete(&self) -> impl Future<Output = ()> {
let mut scan_state_rx = self.last_scan_state_rx.clone();
async move {
@@ -644,7 +644,7 @@ mod tests {
});
let (root1, _) = project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path("/root1", false, cx)
+ project.find_or_create_local_worktree("/root1", false, cx)
})
.await
.unwrap();
@@ -653,7 +653,7 @@ mod tests {
.await;
let (root2, _) = project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path("/root2", false, cx)
+ project.find_or_create_local_worktree("/root2", false, cx)
})
.await
.unwrap();
@@ -144,35 +144,30 @@ message OpenBufferResponse {
message CloseBuffer {
uint64 project_id = 1;
- uint64 worktree_id = 2;
- uint64 buffer_id = 3;
+ uint64 buffer_id = 2;
}
message UpdateBuffer {
uint64 project_id = 1;
- uint64 worktree_id = 2;
- uint64 buffer_id = 3;
+ uint64 buffer_id = 2;
repeated Operation operations = 4;
}
message SaveBuffer {
uint64 project_id = 1;
- uint64 worktree_id = 2;
- uint64 buffer_id = 3;
+ uint64 buffer_id = 2;
}
message BufferSaved {
uint64 project_id = 1;
- uint64 worktree_id = 2;
- uint64 buffer_id = 3;
- repeated VectorClockEntry version = 4;
- Timestamp mtime = 5;
+ uint64 buffer_id = 2;
+ repeated VectorClockEntry version = 3;
+ Timestamp mtime = 4;
}
message FormatBuffer {
uint64 project_id = 1;
- uint64 worktree_id = 2;
- uint64 buffer_id = 3;
+ uint64 buffer_id = 2;
}
message UpdateDiagnosticSummary {
@@ -1162,18 +1162,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/a", false, cx)
+ p.find_or_create_local_worktree("/a", false, cx)
})
.await
.unwrap();
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
@@ -1188,7 +1187,6 @@ mod tests {
)
.await
.unwrap();
- let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
let replica_id_b = project_b.read_with(&cx_b, |project, _| {
assert_eq!(
@@ -1214,25 +1212,26 @@ mod tests {
.await;
// Open the same file as client B and client A.
- let buffer_b = worktree_b
- .update(&mut cx_b, |worktree, cx| worktree.open_buffer("b.txt", cx))
+ let buffer_b = project_b
+ .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
.await
- .unwrap()
- .0;
+ .unwrap();
let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
buffer_b.read_with(&cx_b, |buf, cx| {
assert_eq!(buf.read(cx).text(), "b-contents")
});
- worktree_a.read_with(&cx_a, |tree, cx| assert!(tree.has_open_buffer("b.txt", cx)));
- let buffer_a = worktree_a
- .update(&mut cx_a, |tree, cx| tree.open_buffer("b.txt", cx))
+ project_a.read_with(&cx_a, |project, cx| {
+ assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
+ });
+ let buffer_a = project_a
+ .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
.await
- .unwrap()
- .0;
+ .unwrap();
let editor_b = cx_b.add_view(window_b, |cx| {
Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
});
+
// TODO
// // Create a selection set as client B and see that selection set as client A.
// buffer_a
@@ -1256,8 +1255,10 @@ mod tests {
// Close the buffer as client A, see that the buffer is closed.
cx_a.update(move |_| drop(buffer_a));
- worktree_a
- .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx))
+ project_a
+ .condition(&cx_a, |project, cx| {
+ !project.has_open_buffer((worktree_id, "b.txt"), cx)
+ })
.await;
// Dropping the client B's project removes client B from client A's collaborators.
@@ -1299,18 +1300,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/a", false, cx)
+ p.find_or_create_local_worktree("/a", false, cx)
})
.await
.unwrap();
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
@@ -1326,13 +1326,12 @@ mod tests {
)
.await
.unwrap();
-
- let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
- worktree_b
- .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
+ project_b
+ .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
.await
.unwrap();
+ // Unshare the project as client A
project_a
.update(&mut cx_a, |project, cx| project.unshare(cx))
.await
@@ -1349,6 +1348,7 @@ mod tests {
.await
.unwrap();
assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
+
let project_c = Project::remote(
project_id,
client_b.clone(),
@@ -1359,9 +1359,8 @@ mod tests {
)
.await
.unwrap();
- let worktree_c = project_c.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
- worktree_c
- .update(&mut cx_b, |tree, cx| tree.open_buffer("a.txt", cx))
+ project_c
+ .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
.await
.unwrap();
}
@@ -1403,18 +1402,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/a", false, cx)
+ p.find_or_create_local_worktree("/a", false, cx)
})
.await
.unwrap();
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
@@ -1439,29 +1437,26 @@ mod tests {
)
.await
.unwrap();
-
- // Open and edit a buffer as both guests B and C.
let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
- let buffer_b = worktree_b
- .update(&mut cx_b, |tree, cx| tree.open_buffer("file1", cx))
+
+ // Open and edit a buffer as both guests B and C.
+ let buffer_b = project_b
+ .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
.await
- .unwrap()
- .0;
- let buffer_c = worktree_c
- .update(&mut cx_c, |tree, cx| tree.open_buffer("file1", cx))
+ .unwrap();
+ let buffer_c = project_c
+ .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
.await
- .unwrap()
- .0;
+ .unwrap();
buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
// Open and edit that buffer as the host.
- let buffer_a = worktree_a
- .update(&mut cx_a, |tree, cx| tree.open_buffer("file1", cx))
+ let buffer_a = project_a
+ .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
.await
- .unwrap()
- .0;
+ .unwrap();
buffer_a
.condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
@@ -1475,7 +1470,9 @@ mod tests {
.condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
.await;
buffer_b
- .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
+ .condition(&mut cx_b, |buf, _| {
+ dbg!(buf.text()) == "i-am-c, i-am-b, i-am-a"
+ })
.await;
buffer_c
.condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
@@ -1557,18 +1554,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/dir", false, cx)
+ p.find_or_create_local_worktree("/dir", false, cx)
})
.await
.unwrap();
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
@@ -1586,11 +1582,10 @@ mod tests {
let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
// Open a buffer as client B
- let buffer_b = worktree_b
- .update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx))
+ let buffer_b = project_b
+ .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
.await
- .unwrap()
- .0;
+ .unwrap();
let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
@@ -1654,18 +1649,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/dir", false, cx)
+ p.find_or_create_local_worktree("/dir", false, cx)
})
.await
.unwrap();
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
@@ -1680,26 +1674,24 @@ mod tests {
)
.await
.unwrap();
- let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
// Open a buffer as client A
- let buffer_a = worktree_a
- .update(&mut cx_a, |tree, cx| tree.open_buffer("a.txt", cx))
+ let buffer_a = project_a
+ .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
.await
- .unwrap()
- .0;
+ .unwrap();
// Start opening the same buffer as client B
let buffer_b = cx_b
.background()
- .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
+ .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
task::yield_now().await;
// Edit the buffer as client A while client B is still opening it.
buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
- let buffer_b = buffer_b.await.unwrap().0;
+ let buffer_b = buffer_b.await.unwrap();
buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
}
@@ -1737,18 +1729,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/dir", false, cx)
+ p.find_or_create_local_worktree("/dir", false, cx)
})
.await
.unwrap();
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
@@ -1763,7 +1754,6 @@ mod tests {
)
.await
.unwrap();
- let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
// See that a guest has joined as client A.
project_a
@@ -1773,7 +1763,7 @@ mod tests {
// Begin opening a buffer as client B, but leave the project before the open completes.
let buffer_b = cx_b
.background()
- .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.txt", cx)));
+ .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
cx_b.update(|_| drop(project_b));
drop(buffer_b);
@@ -1815,7 +1805,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/a", false, cx)
+ p.find_or_create_local_worktree("/a", false, cx)
})
.await
.unwrap();
@@ -1904,19 +1894,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/a", false, cx)
+ p.find_or_create_local_worktree("/a", false, cx)
})
.await
.unwrap();
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
@@ -2041,13 +2029,11 @@ mod tests {
.await;
// Open the file with the errors on client B. They should be present.
- let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
let buffer_b = cx_b
.background()
- .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
+ .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
.await
- .unwrap()
- .0;
+ .unwrap();
buffer_b.read_with(&cx_b, |buffer, _| {
assert_eq!(
@@ -2128,18 +2114,17 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/a", false, cx)
+ p.find_or_create_local_worktree("/a", false, cx)
})
.await
.unwrap();
worktree_a
.read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
.await;
- let project_id = project_a
- .update(&mut cx_a, |project, _| project.next_remote_id())
- .await;
+ let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
+ let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
project_a
- .update(&mut cx_a, |project, cx| project.share(cx))
+ .update(&mut cx_a, |p, cx| p.share(cx))
.await
.unwrap();
@@ -2156,13 +2141,11 @@ mod tests {
.unwrap();
// Open the file to be formatted on client B.
- let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
let buffer_b = cx_b
.background()
- .spawn(worktree_b.update(&mut cx_b, |worktree, cx| worktree.open_buffer("a.rs", cx)))
+ .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
.await
- .unwrap()
- .0;
+ .unwrap();
let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
let (request_id, _) = fake_language_server
@@ -2638,7 +2621,7 @@ mod tests {
});
let (worktree_a, _) = project_a
.update(&mut cx_a, |p, cx| {
- p.find_or_create_worktree_for_abs_path("/a", false, cx)
+ p.find_or_create_local_worktree("/a", false, cx)
})
.await
.unwrap();
@@ -723,7 +723,7 @@ impl Workspace {
cx: &mut ViewContext<Self>,
) -> Task<Result<ProjectPath>> {
let entry = self.project().update(cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(abs_path, false, cx)
+ project.find_or_create_local_worktree(abs_path, false, cx)
});
cx.spawn(|_, cx| async move {
let (worktree, path) = entry.await?;
@@ -245,7 +245,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/root"), false, cx)
+ project.find_or_create_local_worktree("/root", false, cx)
})
.await
.unwrap();
@@ -358,7 +358,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/dir1"), false, cx)
+ project.find_or_create_local_worktree("/dir1", false, cx)
})
.await
.unwrap();
@@ -425,7 +425,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/root"), false, cx)
+ project.find_or_create_local_worktree("/root", false, cx)
})
.await
.unwrap();
@@ -474,7 +474,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/root"), false, cx)
+ project.find_or_create_local_worktree("/root", false, cx)
})
.await
.unwrap();
@@ -626,7 +626,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/root"), false, cx)
+ project.find_or_create_local_worktree("/root", false, cx)
})
.await
.unwrap();
@@ -689,7 +689,7 @@ mod tests {
params
.project
.update(&mut cx, |project, cx| {
- project.find_or_create_worktree_for_abs_path(Path::new("/root"), false, cx)
+ project.find_or_create_local_worktree("/root", false, cx)
})
.await
.unwrap();