From c4e0f5e0ee83e02567a5512b0f5fafef49225e66 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Tue, 24 Sep 2024 15:52:30 -0600 Subject: [PATCH] Rebuild buffer store to be aware of remote/local distinction (#18303) Release Notes: - N/A --------- Co-authored-by: Mikayla --- .../remote_editing_collaboration_tests.rs | 27 +- crates/project/src/buffer_store.rs | 1981 ++++++++++------- crates/project/src/lsp_command.rs | 23 +- crates/project/src/lsp_store.rs | 33 +- crates/project/src/project.rs | 18 +- .../remote_server/src/remote_editing_tests.rs | 1 + 6 files changed, 1192 insertions(+), 891 deletions(-) diff --git a/crates/collab/src/tests/remote_editing_collaboration_tests.rs b/crates/collab/src/tests/remote_editing_collaboration_tests.rs index a81166bb00ceecd46370d8cae230115b98403d4e..bad5ef9053ce703a66daf37834dd93e57e27b11e 100644 --- a/crates/collab/src/tests/remote_editing_collaboration_tests.rs +++ b/crates/collab/src/tests/remote_editing_collaboration_tests.rs @@ -3,6 +3,7 @@ use call::ActiveCall; use fs::{FakeFs, Fs as _}; use gpui::{Context as _, TestAppContext}; use language::language_settings::all_language_settings; +use project::ProjectPath; use remote::SshSession; use remote_server::HeadlessProject; use serde_json::json; @@ -108,14 +109,36 @@ async fn test_sharing_an_ssh_remote_project( }); project_b - .update(cx_b, |project, cx| project.save_buffer(buffer_b, cx)) + .update(cx_b, |project, cx| { + project.save_buffer_as( + buffer_b.clone(), + ProjectPath { + worktree_id: worktree_id.to_owned(), + path: Arc::from(Path::new("src/renamed.rs")), + }, + cx, + ) + }) .await .unwrap(); assert_eq!( remote_fs - .load("/code/project1/src/lib.rs".as_ref()) + .load("/code/project1/src/renamed.rs".as_ref()) .await .unwrap(), "fn one() -> usize { 100 }" ); + cx_b.run_until_parked(); + cx_b.update(|cx| { + assert_eq!( + buffer_b + .read(cx) + .file() + .unwrap() + .path() + .to_string_lossy() + .to_string(), + "src/renamed.rs".to_string() + ); + }); } diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index b69679d6ac3b3acbca996e5d19c6c92d98174df0..aa86a8f7e256e8ee4f79517fdfb2d5d5bfe311e1 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -10,7 +10,8 @@ use fs::Fs; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; use git::blame::Blame; use gpui::{ - AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel, + AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Subscription, + Task, WeakModel, }; use http_client::Url; use language::{ @@ -25,27 +26,72 @@ use smol::channel::Receiver; use std::{io, path::Path, str::FromStr as _, sync::Arc, time::Instant}; use text::BufferId; use util::{debug_panic, maybe, ResultExt as _, TryFutureExt}; -use worktree::{ - File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree, - WorktreeId, -}; +use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId}; -/// A set of open buffers. -pub struct BufferStore { - state: BufferStoreState, - downstream_client: Option<(AnyProtoClient, u64)>, +trait BufferStoreImpl { + fn open_buffer( + &self, + path: Arc, + worktree: Model, + cx: &mut ModelContext, + ) -> Task>>; + + fn save_buffer( + &self, + buffer: Model, + cx: &mut ModelContext, + ) -> Task>; + + fn save_buffer_as( + &self, + buffer: Model, + path: ProjectPath, + cx: &mut ModelContext, + ) -> Task>; + + fn create_buffer(&self, cx: &mut ModelContext) -> Task>>; + + fn reload_buffers( + &self, + buffers: Vec>, + push_to_history: bool, + cx: &mut ModelContext, + ) -> Task>; + + fn as_remote(&self) -> Option>; + fn as_local(&self) -> Option>; +} + +struct RemoteBufferStore { + shared_with_me: HashSet>, + upstream_client: AnyProtoClient, + project_id: u64, + loading_remote_buffers_by_id: HashMap>, + remote_buffer_listeners: + HashMap, anyhow::Error>>>>, worktree_store: Model, - opened_buffers: HashMap, + buffer_store: WeakModel, +} + +struct LocalBufferStore { local_buffer_ids_by_path: HashMap, local_buffer_ids_by_entry_id: HashMap, + buffer_store: WeakModel, + worktree_store: Model, + _subscription: Subscription, +} + +/// A set of open buffers. +pub struct BufferStore { + state: Box, #[allow(clippy::type_complexity)] loading_buffers_by_path: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, >, - loading_remote_buffers_by_id: HashMap>, - remote_buffer_listeners: - HashMap, anyhow::Error>>>>, + worktree_store: Model, + opened_buffers: HashMap, + downstream_client: Option<(AnyProtoClient, u64)>, shared_buffers: HashMap>>, } @@ -63,19 +109,858 @@ pub enum BufferStoreEvent { }, } -enum BufferStoreState { - Remote { - shared_with_me: HashSet>, - upstream_client: AnyProtoClient, - project_id: u64, - }, - Local {}, +#[derive(Default, Debug)] +pub struct ProjectTransaction(pub HashMap, language::Transaction>); + +impl EventEmitter for BufferStore {} + +impl RemoteBufferStore { + pub fn wait_for_remote_buffer( + &mut self, + id: BufferId, + cx: &mut AppContext, + ) -> Task>> { + let buffer_store = self.buffer_store.clone(); + let (tx, rx) = oneshot::channel(); + self.remote_buffer_listeners.entry(id).or_default().push(tx); + + cx.spawn(|cx| async move { + if let Some(buffer) = buffer_store + .read_with(&cx, |buffer_store, _| buffer_store.get(id)) + .ok() + .flatten() + { + return Ok(buffer); + } + + cx.background_executor() + .spawn(async move { rx.await? }) + .await + }) + } + + fn save_remote_buffer( + &self, + buffer_handle: Model, + new_path: Option, + cx: &ModelContext, + ) -> Task> { + let buffer = buffer_handle.read(cx); + let buffer_id = buffer.remote_id().into(); + let version = buffer.version(); + let rpc = self.upstream_client.clone(); + let project_id = self.project_id; + cx.spawn(move |_, mut cx| async move { + let response = rpc + .request(proto::SaveBuffer { + project_id, + buffer_id, + new_path, + version: serialize_version(&version), + }) + .await?; + let version = deserialize_version(&response.version); + let mtime = response.mtime.map(|mtime| mtime.into()); + + buffer_handle.update(&mut cx, |buffer, cx| { + buffer.did_save(version.clone(), mtime, cx); + })?; + + Ok(()) + }) + } + + pub fn handle_create_buffer_for_peer( + &mut self, + envelope: TypedEnvelope, + replica_id: u16, + capability: Capability, + cx: &mut ModelContext, + ) -> Result>> { + match envelope + .payload + .variant + .ok_or_else(|| anyhow!("missing variant"))? + { + proto::create_buffer_for_peer::Variant::State(mut state) => { + let buffer_id = BufferId::new(state.id)?; + + let buffer_result = maybe!({ + let mut buffer_file = None; + if let Some(file) = state.file.take() { + let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id); + let worktree = self + .worktree_store + .read(cx) + .worktree_for_id(worktree_id, cx) + .ok_or_else(|| { + anyhow!("no worktree found for id {}", file.worktree_id) + })?; + buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) + as Arc); + } + Buffer::from_proto(replica_id, capability, state, buffer_file) + }); + + match buffer_result { + Ok(buffer) => { + let buffer = cx.new_model(|_| buffer); + self.loading_remote_buffers_by_id.insert(buffer_id, buffer); + } + Err(error) => { + if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) { + for listener in listeners { + listener.send(Err(anyhow!(error.cloned()))).ok(); + } + } + } + } + } + proto::create_buffer_for_peer::Variant::Chunk(chunk) => { + let buffer_id = BufferId::new(chunk.buffer_id)?; + let buffer = self + .loading_remote_buffers_by_id + .get(&buffer_id) + .cloned() + .ok_or_else(|| { + anyhow!( + "received chunk for buffer {} without initial state", + chunk.buffer_id + ) + })?; + + let result = maybe!({ + let operations = chunk + .operations + .into_iter() + .map(language::proto::deserialize_operation) + .collect::>>()?; + buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx)); + anyhow::Ok(()) + }); + + if let Err(error) = result { + self.loading_remote_buffers_by_id.remove(&buffer_id); + if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) { + for listener in listeners { + listener.send(Err(error.cloned())).ok(); + } + } + } else if chunk.is_last { + self.loading_remote_buffers_by_id.remove(&buffer_id); + if self.upstream_client.is_via_collab() { + // retain buffers sent by peers to avoid races. + self.shared_with_me.insert(buffer.clone()); + } + + if let Some(senders) = self.remote_buffer_listeners.remove(&buffer_id) { + for sender in senders { + sender.send(Ok(buffer.clone())).ok(); + } + } + return Ok(Some(buffer)); + } + } + } + return Ok(None); + } + + pub fn incomplete_buffer_ids(&self) -> Vec { + self.loading_remote_buffers_by_id + .keys() + .copied() + .collect::>() + } + + pub fn deserialize_project_transaction( + &self, + message: proto::ProjectTransaction, + push_to_history: bool, + cx: &mut ModelContext, + ) -> Task> { + cx.spawn(|this, mut cx| async move { + let mut project_transaction = ProjectTransaction::default(); + for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) + { + let buffer_id = BufferId::new(buffer_id)?; + let buffer = this + .update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; + let transaction = language::proto::deserialize_transaction(transaction)?; + project_transaction.0.insert(buffer, transaction); + } + + for (buffer, transaction) in &project_transaction.0 { + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_edits(transaction.edit_ids.iter().copied()) + })? + .await?; + + if push_to_history { + buffer.update(&mut cx, |buffer, _| { + buffer.push_transaction(transaction.clone(), Instant::now()); + })?; + } + } + + Ok(project_transaction) + }) + } } -#[derive(Default, Debug)] -pub struct ProjectTransaction(pub HashMap, language::Transaction>); +impl BufferStoreImpl for Model { + fn as_remote(&self) -> Option> { + Some(self.clone()) + } + + fn as_local(&self) -> Option> { + None + } + + fn save_buffer( + &self, + buffer: Model, + cx: &mut ModelContext, + ) -> Task> { + self.update(cx, |this, cx| { + this.save_remote_buffer(buffer.clone(), None, cx) + }) + } + fn save_buffer_as( + &self, + buffer: Model, + path: ProjectPath, + cx: &mut ModelContext, + ) -> Task> { + self.update(cx, |this, cx| { + this.save_remote_buffer(buffer, Some(path.to_proto()), cx) + }) + } + + fn open_buffer( + &self, + path: Arc, + worktree: Model, + cx: &mut ModelContext, + ) -> Task>> { + self.update(cx, |this, cx| { + let worktree_id = worktree.read(cx).id().to_proto(); + let project_id = this.project_id; + let client = this.upstream_client.clone(); + let path_string = path.clone().to_string_lossy().to_string(); + cx.spawn(move |this, mut cx| async move { + let response = client + .request(proto::OpenBufferByPath { + project_id, + worktree_id, + path: path_string, + }) + .await?; + let buffer_id = BufferId::new(response.buffer_id)?; + + let buffer = this + .update(&mut cx, { + |this, cx| this.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; + + Ok(buffer) + }) + }) + } + + fn create_buffer(&self, cx: &mut ModelContext) -> Task>> { + self.update(cx, |this, cx| { + let create = this.upstream_client.request(proto::OpenNewBuffer { + project_id: this.project_id, + }); + cx.spawn(|this, mut cx| async move { + let response = create.await?; + let buffer_id = BufferId::new(response.buffer_id)?; + + this.update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + })? + .await + }) + }) + } + + fn reload_buffers( + &self, + buffers: Vec>, + push_to_history: bool, + cx: &mut ModelContext, + ) -> Task> { + self.update(cx, |this, cx| { + let request = this.upstream_client.request(proto::ReloadBuffers { + project_id: this.project_id, + buffer_ids: buffers + .iter() + .map(|buffer| buffer.read(cx).remote_id().to_proto()) + .collect(), + }); + + cx.spawn(|this, mut cx| async move { + let response = request + .await? + .transaction + .ok_or_else(|| anyhow!("missing transaction"))?; + this.update(&mut cx, |this, cx| { + this.deserialize_project_transaction(response, push_to_history, cx) + })? + .await + }) + }) + } +} + +impl LocalBufferStore { + fn save_local_buffer( + &self, + buffer_handle: Model, + worktree: Model, + path: Arc, + mut has_changed_file: bool, + cx: &mut ModelContext, + ) -> Task> { + let buffer = buffer_handle.read(cx); + + let text = buffer.as_rope().clone(); + let line_ending = buffer.line_ending(); + let version = buffer.version(); + let buffer_id = buffer.remote_id(); + if buffer.file().is_some_and(|file| !file.is_created()) { + has_changed_file = true; + } + + let save = worktree.update(cx, |worktree, cx| { + worktree.write_file(path.as_ref(), text, line_ending, cx) + }); + + cx.spawn(move |this, mut cx| async move { + let new_file = save.await?; + let mtime = new_file.mtime; + this.update(&mut cx, |this, cx| { + if let Some((downstream_client, project_id)) = this.downstream_client(cx) { + if has_changed_file { + downstream_client + .send(proto::UpdateBufferFile { + project_id, + buffer_id: buffer_id.to_proto(), + file: Some(language::File::to_proto(&*new_file, cx)), + }) + .log_err(); + } + downstream_client + .send(proto::BufferSaved { + project_id, + buffer_id: buffer_id.to_proto(), + version: serialize_version(&version), + mtime: mtime.map(|time| time.into()), + }) + .log_err(); + } + })?; + buffer_handle.update(&mut cx, |buffer, cx| { + if has_changed_file { + buffer.file_updated(new_file, cx); + } + buffer.did_save(version.clone(), mtime, cx); + }) + }) + } + + fn subscribe_to_worktree(&mut self, worktree: &Model, cx: &mut ModelContext) { + cx.subscribe(worktree, |this, worktree, event, cx| { + if worktree.read(cx).is_local() { + match event { + worktree::Event::UpdatedEntries(changes) => { + this.local_worktree_entries_changed(&worktree, changes, cx); + } + worktree::Event::UpdatedGitRepositories(updated_repos) => { + this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx) + } + _ => {} + } + } + }) + .detach(); + } + + fn local_worktree_entries_changed( + &mut self, + worktree_handle: &Model, + changes: &[(Arc, ProjectEntryId, PathChange)], + cx: &mut ModelContext, + ) { + let snapshot = worktree_handle.read(cx).snapshot(); + for (path, entry_id, _) in changes { + self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx); + } + } + + fn local_worktree_git_repos_changed( + &mut self, + worktree_handle: Model, + changed_repos: &UpdatedGitRepositoriesSet, + cx: &mut ModelContext, + ) { + debug_assert!(worktree_handle.read(cx).is_local()); + let Some(buffer_store) = self.buffer_store.upgrade() else { + return; + }; + + // Identify the loading buffers whose containing repository that has changed. + let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| { + let future_buffers = buffer_store + .loading_buffers() + .filter_map(|(project_path, receiver)| { + if project_path.worktree_id != worktree_handle.read(cx).id() { + return None; + } + let path = &project_path.path; + changed_repos + .iter() + .find(|(work_dir, _)| path.starts_with(work_dir))?; + let path = path.clone(); + Some(async move { + BufferStore::wait_for_loading_buffer(receiver) + .await + .ok() + .map(|buffer| (buffer, path)) + }) + }) + .collect::>(); + + // Identify the current buffers whose containing repository has changed. + let current_buffers = buffer_store + .buffers() + .filter_map(|buffer| { + let file = File::from_dyn(buffer.read(cx).file())?; + if file.worktree != worktree_handle { + return None; + } + changed_repos + .iter() + .find(|(work_dir, _)| file.path.starts_with(work_dir))?; + Some((buffer, file.path.clone())) + }) + .collect::>(); + (future_buffers, current_buffers) + }); + + if future_buffers.len() + current_buffers.len() == 0 { + return; + } + + cx.spawn(move |this, mut cx| async move { + // Wait for all of the buffers to load. + let future_buffers = future_buffers.collect::>().await; + + // Reload the diff base for every buffer whose containing git repository has changed. + let snapshot = + worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?; + let diff_bases_by_buffer = cx + .background_executor() + .spawn(async move { + let mut diff_base_tasks = future_buffers + .into_iter() + .flatten() + .chain(current_buffers) + .filter_map(|(buffer, path)| { + let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?; + let relative_path = repo_entry.relativize(&snapshot, &path).ok()?; + Some(async move { + let base_text = + local_repo_entry.repo().load_index_text(&relative_path); + Some((buffer, base_text)) + }) + }) + .collect::>(); + + let mut diff_bases = Vec::with_capacity(diff_base_tasks.len()); + while let Some(diff_base) = diff_base_tasks.next().await { + if let Some(diff_base) = diff_base { + diff_bases.push(diff_base); + } + } + diff_bases + }) + .await; + + this.update(&mut cx, |this, cx| { + // Assign the new diff bases on all of the buffers. + for (buffer, diff_base) in diff_bases_by_buffer { + let buffer_id = buffer.update(cx, |buffer, cx| { + buffer.set_diff_base(diff_base.clone(), cx); + buffer.remote_id().to_proto() + }); + if let Some((client, project_id)) = &this.downstream_client(cx) { + client + .send(proto::UpdateDiffBase { + project_id: *project_id, + buffer_id, + diff_base, + }) + .log_err(); + } + } + }) + }) + .detach_and_log_err(cx); + } + + fn local_worktree_entry_changed( + &mut self, + entry_id: ProjectEntryId, + path: &Arc, + worktree: &Model, + snapshot: &worktree::Snapshot, + cx: &mut ModelContext, + ) -> Option<()> { + let project_path = ProjectPath { + worktree_id: snapshot.id(), + path: path.clone(), + }; + let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) { + Some(&buffer_id) => buffer_id, + None => self.local_buffer_ids_by_path.get(&project_path).copied()?, + }; + let buffer = self + .buffer_store + .update(cx, |buffer_store, _| { + if let Some(buffer) = buffer_store.get(buffer_id) { + Some(buffer) + } else { + buffer_store.opened_buffers.remove(&buffer_id); + None + } + }) + .ok() + .flatten(); + let buffer = if let Some(buffer) = buffer { + buffer + } else { + self.local_buffer_ids_by_path.remove(&project_path); + self.local_buffer_ids_by_entry_id.remove(&entry_id); + return None; + }; + + let events = buffer.update(cx, |buffer, cx| { + let file = buffer.file()?; + let old_file = File::from_dyn(Some(file))?; + if old_file.worktree != *worktree { + return None; + } + + let new_file = if let Some(entry) = old_file + .entry_id + .and_then(|entry_id| snapshot.entry_for_id(entry_id)) + { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree.clone(), + is_deleted: false, + is_private: entry.is_private, + } + } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree.clone(), + is_deleted: false, + is_private: entry.is_private, + } + } else { + File { + is_local: true, + entry_id: old_file.entry_id, + path: old_file.path.clone(), + mtime: old_file.mtime, + worktree: worktree.clone(), + is_deleted: true, + is_private: old_file.is_private, + } + }; + + if new_file == *old_file { + return None; + } + + let mut events = Vec::new(); + if new_file.path != old_file.path { + self.local_buffer_ids_by_path.remove(&ProjectPath { + path: old_file.path.clone(), + worktree_id: old_file.worktree_id(cx), + }); + self.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: new_file.worktree_id(cx), + path: new_file.path.clone(), + }, + buffer_id, + ); + events.push(BufferStoreEvent::BufferChangedFilePath { + buffer: cx.handle(), + old_file: buffer.file().cloned(), + }); + } + + if new_file.entry_id != old_file.entry_id { + if let Some(entry_id) = old_file.entry_id { + self.local_buffer_ids_by_entry_id.remove(&entry_id); + } + if let Some(entry_id) = new_file.entry_id { + self.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); + } + } + + if let Some((client, project_id)) = &self.downstream_client(cx) { + client + .send(proto::UpdateBufferFile { + project_id: *project_id, + buffer_id: buffer_id.to_proto(), + file: Some(new_file.to_proto(cx)), + }) + .ok(); + } + + buffer.file_updated(Arc::new(new_file), cx); + Some(events) + })?; + self.buffer_store + .update(cx, |_buffer_store, cx| { + for event in events { + cx.emit(event); + } + }) + .log_err()?; + + None + } + + fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> { + self.buffer_store + .upgrade()? + .read(cx) + .downstream_client + .clone() + } + + fn buffer_changed_file(&mut self, buffer: Model, cx: &mut AppContext) -> Option<()> { + let file = File::from_dyn(buffer.read(cx).file())?; + + let remote_id = buffer.read(cx).remote_id(); + if let Some(entry_id) = file.entry_id { + match self.local_buffer_ids_by_entry_id.get(&entry_id) { + Some(_) => { + return None; + } + None => { + self.local_buffer_ids_by_entry_id + .insert(entry_id, remote_id); + } + } + }; + self.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + remote_id, + ); + + Some(()) + } +} + +impl BufferStoreImpl for Model { + fn as_remote(&self) -> Option> { + None + } + + fn as_local(&self) -> Option> { + Some(self.clone()) + } + + fn save_buffer( + &self, + buffer: Model, + cx: &mut ModelContext, + ) -> Task> { + self.update(cx, |this, cx| { + let Some(file) = File::from_dyn(buffer.read(cx).file()) else { + return Task::ready(Err(anyhow!("buffer doesn't have a file"))); + }; + let worktree = file.worktree.clone(); + this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx) + }) + } + + fn save_buffer_as( + &self, + buffer: Model, + path: ProjectPath, + cx: &mut ModelContext, + ) -> Task> { + self.update(cx, |this, cx| { + let Some(worktree) = this + .worktree_store + .read(cx) + .worktree_for_id(path.worktree_id, cx) + else { + return Task::ready(Err(anyhow!("no such worktree"))); + }; + this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx) + }) + } + + fn open_buffer( + &self, + path: Arc, + worktree: Model, + cx: &mut ModelContext, + ) -> Task>> { + let buffer_store = cx.weak_model(); + self.update(cx, |_, cx| { + let load_buffer = worktree.update(cx, |worktree, cx| { + let load_file = worktree.load_file(path.as_ref(), cx); + let reservation = cx.reserve_model(); + let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64()); + cx.spawn(move |_, mut cx| async move { + let loaded = load_file.await?; + let text_buffer = cx + .background_executor() + .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) + .await; + cx.insert_model(reservation, |_| { + Buffer::build( + text_buffer, + loaded.diff_base, + Some(loaded.file), + Capability::ReadWrite, + ) + }) + }) + }); + + cx.spawn(move |this, mut cx| async move { + let buffer = match load_buffer.await { + Ok(buffer) => Ok(buffer), + Err(error) if is_not_found_error(&error) => cx.new_model(|cx| { + let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64()); + let text_buffer = text::Buffer::new(0, buffer_id, "".into()); + Buffer::build( + text_buffer, + None, + Some(Arc::new(File { + worktree, + path, + mtime: None, + entry_id: None, + is_local: true, + is_deleted: false, + is_private: false, + })), + Capability::ReadWrite, + ) + }), + Err(e) => Err(e), + }?; + this.update(&mut cx, |this, cx| { + buffer_store.update(cx, |buffer_store, cx| { + buffer_store.add_buffer(buffer.clone(), cx) + })??; + let buffer_id = buffer.read(cx).remote_id(); + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + this.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + buffer_id, + ); + + if let Some(entry_id) = file.entry_id { + this.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); + } + } + + anyhow::Ok(()) + })??; + + Ok(buffer) + }) + }) + } + + fn create_buffer(&self, cx: &mut ModelContext) -> Task>> { + let handle = self.clone(); + cx.spawn(|buffer_store, mut cx| async move { + let buffer = cx.new_model(|cx| { + Buffer::local("", cx).with_language(language::PLAIN_TEXT.clone(), cx) + })?; + buffer_store.update(&mut cx, |buffer_store, cx| { + buffer_store.add_buffer(buffer.clone(), cx).log_err(); + let buffer_id = buffer.read(cx).remote_id(); + handle.update(cx, |this, cx| { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + this.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + buffer_id, + ); + + if let Some(entry_id) = file.entry_id { + this.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); + } + } + }); + })?; + Ok(buffer) + }) + } + + fn reload_buffers( + &self, + buffers: Vec>, + push_to_history: bool, + cx: &mut ModelContext, + ) -> Task> { + cx.spawn(move |_, mut cx| async move { + let mut project_transaction = ProjectTransaction::default(); + for buffer in buffers { + let transaction = buffer + .update(&mut cx, |buffer, cx| buffer.reload(cx))? + .await?; + buffer.update(&mut cx, |buffer, cx| { + if let Some(transaction) = transaction { + if !push_to_history { + buffer.forget_transaction(transaction.id); + } + project_transaction.0.insert(cx.handle(), transaction); + } + })?; + } -impl EventEmitter for BufferStore {} + Ok(project_transaction) + }) + } +} impl BufferStore { pub fn init(client: &AnyProtoClient) { @@ -90,24 +975,31 @@ impl BufferStore { /// Creates a buffer store, optionally retaining its buffers. pub fn local(worktree_store: Model, cx: &mut ModelContext) -> Self { - cx.subscribe(&worktree_store, |this, _, event, cx| { - if let WorktreeStoreEvent::WorktreeAdded(worktree) = event { - this.subscribe_to_worktree(worktree, cx); - } - }) - .detach(); - + let this = cx.weak_model(); Self { - state: BufferStoreState::Local {}, + state: Box::new(cx.new_model(|cx| { + let subscription = cx.subscribe( + &worktree_store, + |this: &mut LocalBufferStore, _, event, cx| { + if let WorktreeStoreEvent::WorktreeAdded(worktree) = event { + this.subscribe_to_worktree(worktree, cx); + } + }, + ); + + LocalBufferStore { + local_buffer_ids_by_path: Default::default(), + local_buffer_ids_by_entry_id: Default::default(), + buffer_store: this, + worktree_store: worktree_store.clone(), + _subscription: subscription, + } + })), downstream_client: None, - worktree_store, opened_buffers: Default::default(), - remote_buffer_listeners: Default::default(), - loading_remote_buffers_by_id: Default::default(), - local_buffer_ids_by_path: Default::default(), - local_buffer_ids_by_entry_id: Default::default(), - loading_buffers_by_path: Default::default(), shared_buffers: Default::default(), + loading_buffers_by_path: Default::default(), + worktree_store, } } @@ -117,28 +1009,22 @@ impl BufferStore { remote_id: u64, cx: &mut ModelContext, ) -> Self { - cx.subscribe(&worktree_store, |this, _, event, cx| { - if let WorktreeStoreEvent::WorktreeAdded(worktree) = event { - this.subscribe_to_worktree(worktree, cx); - } - }) - .detach(); - + let this = cx.weak_model(); Self { - state: BufferStoreState::Remote { + state: Box::new(cx.new_model(|_| RemoteBufferStore { shared_with_me: Default::default(), - upstream_client, + loading_remote_buffers_by_id: Default::default(), + remote_buffer_listeners: Default::default(), project_id: remote_id, - }, + upstream_client, + worktree_store: worktree_store.clone(), + buffer_store: this, + })), downstream_client: None, - worktree_store, opened_buffers: Default::default(), - remote_buffer_listeners: Default::default(), - loading_remote_buffers_by_id: Default::default(), - local_buffer_ids_by_path: Default::default(), - local_buffer_ids_by_entry_id: Default::default(), loading_buffers_by_path: Default::default(), shared_buffers: Default::default(), + worktree_store, } } @@ -171,18 +1057,13 @@ impl BufferStore { entry.insert(rx.clone()); let project_path = project_path.clone(); - let load_buffer = match worktree.read(cx) { - Worktree::Local(_) => { - self.open_local_buffer_internal(project_path.path.clone(), worktree, cx) - } - Worktree::Remote(tree) => { - self.open_remote_buffer_internal(&project_path.path, tree, cx) - } - }; + let load_buffer = self + .state + .open_buffer(project_path.path.clone(), worktree, cx); cx.spawn(move |this, mut cx| async move { let load_result = load_buffer.await; - *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { + *tx.borrow_mut() = Some(this.update(&mut cx, |this, _cx| { // Record the fact that the buffer is no longer loading. this.loading_buffers_by_path.remove(&project_path); let buffer = load_result.map_err(Arc::new)?; @@ -201,391 +1082,32 @@ impl BufferStore { .map_err(|e| e.cloned()) }) } - - fn subscribe_to_worktree(&mut self, worktree: &Model, cx: &mut ModelContext) { - cx.subscribe(worktree, |this, worktree, event, cx| { - if worktree.read(cx).is_local() { - match event { - worktree::Event::UpdatedEntries(changes) => { - this.local_worktree_entries_changed(&worktree, changes, cx); - } - worktree::Event::UpdatedGitRepositories(updated_repos) => { - this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx) - } - _ => {} - } - } - }) - .detach(); - } - - fn local_worktree_entries_changed( - &mut self, - worktree_handle: &Model, - changes: &[(Arc, ProjectEntryId, PathChange)], - cx: &mut ModelContext, - ) { - let snapshot = worktree_handle.read(cx).snapshot(); - for (path, entry_id, _) in changes { - self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx); - } - } - - fn local_worktree_git_repos_changed( - &mut self, - worktree_handle: Model, - changed_repos: &UpdatedGitRepositoriesSet, - cx: &mut ModelContext, - ) { - debug_assert!(worktree_handle.read(cx).is_local()); - - // Identify the loading buffers whose containing repository that has changed. - let future_buffers = self - .loading_buffers() - .filter_map(|(project_path, receiver)| { - if project_path.worktree_id != worktree_handle.read(cx).id() { - return None; - } - let path = &project_path.path; - changed_repos - .iter() - .find(|(work_dir, _)| path.starts_with(work_dir))?; - let path = path.clone(); - Some(async move { - Self::wait_for_loading_buffer(receiver) - .await - .ok() - .map(|buffer| (buffer, path)) - }) - }) - .collect::>(); - - // Identify the current buffers whose containing repository has changed. - let current_buffers = self - .buffers() - .filter_map(|buffer| { - let file = File::from_dyn(buffer.read(cx).file())?; - if file.worktree != worktree_handle { - return None; - } - changed_repos - .iter() - .find(|(work_dir, _)| file.path.starts_with(work_dir))?; - Some((buffer, file.path.clone())) - }) - .collect::>(); - - if future_buffers.len() + current_buffers.len() == 0 { - return; - } - - cx.spawn(move |this, mut cx| async move { - // Wait for all of the buffers to load. - let future_buffers = future_buffers.collect::>().await; - - // Reload the diff base for every buffer whose containing git repository has changed. - let snapshot = - worktree_handle.update(&mut cx, |tree, _| tree.as_local().unwrap().snapshot())?; - let diff_bases_by_buffer = cx - .background_executor() - .spawn(async move { - let mut diff_base_tasks = future_buffers - .into_iter() - .flatten() - .chain(current_buffers) - .filter_map(|(buffer, path)| { - let (repo_entry, local_repo_entry) = snapshot.repo_for_path(&path)?; - let relative_path = repo_entry.relativize(&snapshot, &path).ok()?; - Some(async move { - let base_text = - local_repo_entry.repo().load_index_text(&relative_path); - Some((buffer, base_text)) - }) - }) - .collect::>(); - - let mut diff_bases = Vec::with_capacity(diff_base_tasks.len()); - while let Some(diff_base) = diff_base_tasks.next().await { - if let Some(diff_base) = diff_base { - diff_bases.push(diff_base); - } - } - diff_bases - }) - .await; - - this.update(&mut cx, |this, cx| { - // Assign the new diff bases on all of the buffers. - for (buffer, diff_base) in diff_bases_by_buffer { - let buffer_id = buffer.update(cx, |buffer, cx| { - buffer.set_diff_base(diff_base.clone(), cx); - buffer.remote_id().to_proto() - }); - if let Some((client, project_id)) = &this.downstream_client { - client - .send(proto::UpdateDiffBase { - project_id: *project_id, - buffer_id, - diff_base, - }) - .log_err(); - } - } - }) - }) - .detach_and_log_err(cx); - } - - fn open_local_buffer_internal( - &mut self, - path: Arc, - worktree: Model, - cx: &mut ModelContext, - ) -> Task>> { - let load_buffer = worktree.update(cx, |worktree, cx| { - let load_file = worktree.load_file(path.as_ref(), cx); - let reservation = cx.reserve_model(); - let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64()); - cx.spawn(move |_, mut cx| async move { - let loaded = load_file.await?; - let text_buffer = cx - .background_executor() - .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) - .await; - cx.insert_model(reservation, |_| { - Buffer::build( - text_buffer, - loaded.diff_base, - Some(loaded.file), - Capability::ReadWrite, - ) - }) - }) - }); - - cx.spawn(move |this, mut cx| async move { - let buffer = match load_buffer.await { - Ok(buffer) => Ok(buffer), - Err(error) if is_not_found_error(&error) => cx.new_model(|cx| { - let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64()); - let text_buffer = text::Buffer::new(0, buffer_id, "".into()); - Buffer::build( - text_buffer, - None, - Some(Arc::new(File { - worktree, - path, - mtime: None, - entry_id: None, - is_local: true, - is_deleted: false, - is_private: false, - })), - Capability::ReadWrite, - ) - }), - Err(e) => Err(e), - }?; - this.update(&mut cx, |this, cx| { - this.add_buffer(buffer.clone(), cx).log_err(); - })?; - Ok(buffer) - }) - } - - fn open_remote_buffer_internal( - &self, - path: &Arc, - worktree: &RemoteWorktree, - cx: &ModelContext, - ) -> Task>> { - let worktree_id = worktree.id().to_proto(); - let project_id = worktree.project_id(); - let client = worktree.client(); - let path_string = path.clone().to_string_lossy().to_string(); - cx.spawn(move |this, mut cx| async move { - let response = client - .request(proto::OpenBufferByPath { - project_id, - worktree_id, - path: path_string, - }) - .await?; - let buffer_id = BufferId::new(response.buffer_id)?; - this.update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await - }) - } - - pub fn create_buffer( - &mut self, - remote_client: Option<(AnyProtoClient, u64)>, - cx: &mut ModelContext, - ) -> Task>> { - if let Some((remote_client, project_id)) = remote_client { - let create = remote_client.request(proto::OpenNewBuffer { project_id }); - cx.spawn(|this, mut cx| async move { - let response = create.await?; - let buffer_id = BufferId::new(response.buffer_id)?; - - this.update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await - }) - } else { - Task::ready(Ok(self.create_local_buffer("", None, cx))) - } - } - - pub fn create_local_buffer( - &mut self, - text: &str, - language: Option>, - cx: &mut ModelContext, - ) -> Model { - let buffer = cx.new_model(|cx| { - Buffer::local(text, cx) - .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx) - }); - self.add_buffer(buffer.clone(), cx).log_err(); - buffer - } - - pub fn save_buffer( - &mut self, - buffer: Model, - cx: &mut ModelContext, - ) -> Task> { - let Some(file) = File::from_dyn(buffer.read(cx).file()) else { - return Task::ready(Err(anyhow!("buffer doesn't have a file"))); - }; - match file.worktree.read(cx) { - Worktree::Local(_) => { - self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx) - } - Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx), - } - } - - pub fn save_buffer_as( - &mut self, - buffer: Model, - path: ProjectPath, - cx: &mut ModelContext, - ) -> Task> { - let Some(worktree) = self - .worktree_store - .read(cx) - .worktree_for_id(path.worktree_id, cx) - else { - return Task::ready(Err(anyhow!("no such worktree"))); - }; - - let old_file = buffer.read(cx).file().cloned(); - - let task = match worktree.read(cx) { - Worktree::Local(_) => { - self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx) - } - Worktree::Remote(tree) => { - self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx) - } - }; - cx.spawn(|this, mut cx| async move { - task.await?; - this.update(&mut cx, |_, cx| { - cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file }); - }) - }) + + pub fn create_buffer(&mut self, cx: &mut ModelContext) -> Task>> { + self.state.create_buffer(cx) } - fn save_local_buffer( - &self, - worktree: Model, - buffer_handle: Model, - path: Arc, - mut has_changed_file: bool, + pub fn save_buffer( + &mut self, + buffer: Model, cx: &mut ModelContext, ) -> Task> { - let buffer = buffer_handle.read(cx); - let text = buffer.as_rope().clone(); - let line_ending = buffer.line_ending(); - let version = buffer.version(); - let buffer_id = buffer.remote_id(); - if buffer.file().is_some_and(|file| !file.is_created()) { - has_changed_file = true; - } - - let save = worktree.update(cx, |worktree, cx| { - worktree.write_file(path.as_ref(), text, line_ending, cx) - }); - - cx.spawn(move |this, mut cx| async move { - let new_file = save.await?; - let mtime = new_file.mtime; - this.update(&mut cx, |this, cx| { - if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() { - let project_id = *project_id; - if has_changed_file { - downstream_client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.to_proto(), - file: Some(language::File::to_proto(&*new_file, cx)), - }) - .log_err(); - } - downstream_client - .send(proto::BufferSaved { - project_id, - buffer_id: buffer_id.to_proto(), - version: serialize_version(&version), - mtime: mtime.map(|time| time.into()), - }) - .log_err(); - } - })?; - buffer_handle.update(&mut cx, |buffer, cx| { - if has_changed_file { - buffer.file_updated(new_file, cx); - } - buffer.did_save(version.clone(), mtime, cx); - }) - }) + self.state.save_buffer(buffer, cx) } - fn save_remote_buffer( - &self, - buffer_handle: Model, - new_path: Option, - tree: &RemoteWorktree, - cx: &ModelContext, + pub fn save_buffer_as( + &mut self, + buffer: Model, + path: ProjectPath, + cx: &mut ModelContext, ) -> Task> { - let buffer = buffer_handle.read(cx); - let buffer_id = buffer.remote_id().into(); - let version = buffer.version(); - let rpc = tree.client(); - let project_id = tree.project_id(); - cx.spawn(move |_, mut cx| async move { - let response = rpc - .request(proto::SaveBuffer { - project_id, - buffer_id, - new_path, - version: serialize_version(&version), - }) - .await?; - let version = deserialize_version(&response.version); - let mtime = response.mtime.map(|mtime| mtime.into()); - - buffer_handle.update(&mut cx, |buffer, cx| { - buffer.did_save(version.clone(), mtime, cx); - })?; - - Ok(()) + let old_file = buffer.read(cx).file().cloned(); + let task = self.state.save_buffer_as(buffer.clone(), path, cx); + cx.spawn(|this, mut cx| async move { + task.await?; + this.update(&mut cx, |_, cx| { + cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file }); + }) }) } @@ -684,29 +1206,6 @@ impl BufferStore { } } - if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) { - for sender in senders { - sender.send(Ok(buffer.clone())).ok(); - } - } - - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.is_local { - self.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: file.worktree_id(cx), - path: file.path.clone(), - }, - remote_id, - ); - - if let Some(entry_id) = file.entry_id { - self.local_buffer_ids_by_entry_id - .insert(entry_id, remote_id); - } - } - } - cx.subscribe(&buffer, Self::on_buffer_event).detach(); cx.emit(BufferStoreEvent::BufferAdded(buffer)); Ok(()) @@ -753,23 +1252,20 @@ impl BufferStore { .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) } - pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option> { - self.get(buffer_id) - .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned()) - } - - pub fn wait_for_remote_buffer( - &mut self, - id: BufferId, - cx: &mut AppContext, - ) -> Task>> { - let buffer = self.get(id); - if let Some(buffer) = buffer { - return Task::ready(Ok(buffer)); - } - let (tx, rx) = oneshot::channel(); - self.remote_buffer_listeners.entry(id).or_default().push(tx); - cx.background_executor().spawn(async move { rx.await? }) + pub fn get_possibly_incomplete( + &self, + buffer_id: BufferId, + cx: &AppContext, + ) -> Option> { + self.get(buffer_id).or_else(|| { + self.state.as_remote().and_then(|remote| { + remote + .read(cx) + .loading_remote_buffers_by_id + .get(&buffer_id) + .cloned() + }) + }) } pub fn buffer_version_info( @@ -787,15 +1283,19 @@ impl BufferStore { }) .collect(); let incomplete_buffer_ids = self - .loading_remote_buffers_by_id - .keys() - .copied() - .collect::>(); + .state + .as_remote() + .map(|remote| remote.read(cx).incomplete_buffer_ids()) + .unwrap_or_default(); (buffers, incomplete_buffer_ids) } pub fn disconnected_from_host(&mut self, cx: &mut AppContext) { - self.drop_unnecessary_buffers(cx); + for open_buffer in self.opened_buffers.values_mut() { + if let Some(buffer) = open_buffer.upgrade() { + buffer.update(cx, |buffer, _| buffer.give_up_waiting()); + } + } for buffer in self.buffers() { buffer.update(cx, |buffer, cx| { @@ -803,9 +1303,13 @@ impl BufferStore { }); } - // Wake up all futures currently waiting on a buffer to get opened, - // to give them a chance to fail now that we've disconnected. - self.remote_buffer_listeners.clear(); + if let Some(remote) = self.state.as_remote() { + remote.update(cx, |remote, _| { + // Wake up all futures currently waiting on a buffer to get opened, + // to give them a chance to fail now that we've disconnected. + remote.remote_buffer_listeners.clear() + }) + } } pub fn shared( @@ -822,14 +1326,6 @@ impl BufferStore { self.forget_shared_buffers(); } - fn drop_unnecessary_buffers(&mut self, cx: &mut AppContext) { - for open_buffer in self.opened_buffers.values_mut() { - if let Some(buffer) = open_buffer.upgrade() { - buffer.update(cx, |buffer, _| buffer.give_up_waiting()); - } - } - } - pub fn discard_incomplete(&mut self) { self.opened_buffers .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_))); @@ -897,7 +1393,11 @@ impl BufferStore { ) { match event { BufferEvent::FileHandleChanged => { - self.buffer_changed_file(buffer, cx); + if let Some(local) = self.state.as_local() { + local.update(cx, |local, cx| { + local.buffer_changed_file(buffer, cx); + }) + } } BufferEvent::Reloaded => { let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else { @@ -905,164 +1405,17 @@ impl BufferStore { }; let buffer = buffer.read(cx); downstream_client - .send(proto::BufferReloaded { - project_id: *project_id, - buffer_id: buffer.remote_id().to_proto(), - version: serialize_version(&buffer.version()), - mtime: buffer.saved_mtime().map(|t| t.into()), - line_ending: serialize_line_ending(buffer.line_ending()) as i32, - }) - .log_err(); - } - _ => {} - } - } - - fn local_worktree_entry_changed( - &mut self, - entry_id: ProjectEntryId, - path: &Arc, - worktree: &Model, - snapshot: &worktree::Snapshot, - cx: &mut ModelContext, - ) -> Option<()> { - let project_path = ProjectPath { - worktree_id: snapshot.id(), - path: path.clone(), - }; - let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) { - Some(&buffer_id) => buffer_id, - None => self.local_buffer_ids_by_path.get(&project_path).copied()?, - }; - let buffer = if let Some(buffer) = self.get(buffer_id) { - buffer - } else { - self.opened_buffers.remove(&buffer_id); - self.local_buffer_ids_by_path.remove(&project_path); - self.local_buffer_ids_by_entry_id.remove(&entry_id); - return None; - }; - - let events = buffer.update(cx, |buffer, cx| { - let file = buffer.file()?; - let old_file = File::from_dyn(Some(file))?; - if old_file.worktree != *worktree { - return None; - } - - let new_file = if let Some(entry) = old_file - .entry_id - .and_then(|entry_id| snapshot.entry_for_id(entry_id)) - { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree.clone(), - is_deleted: false, - is_private: entry.is_private, - } - } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree.clone(), - is_deleted: false, - is_private: entry.is_private, - } - } else { - File { - is_local: true, - entry_id: old_file.entry_id, - path: old_file.path.clone(), - mtime: old_file.mtime, - worktree: worktree.clone(), - is_deleted: true, - is_private: old_file.is_private, - } - }; - - if new_file == *old_file { - return None; - } - - let mut events = Vec::new(); - if new_file.path != old_file.path { - self.local_buffer_ids_by_path.remove(&ProjectPath { - path: old_file.path.clone(), - worktree_id: old_file.worktree_id(cx), - }); - self.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: new_file.worktree_id(cx), - path: new_file.path.clone(), - }, - buffer_id, - ); - events.push(BufferStoreEvent::BufferChangedFilePath { - buffer: cx.handle(), - old_file: buffer.file().cloned(), - }); - } - - if new_file.entry_id != old_file.entry_id { - if let Some(entry_id) = old_file.entry_id { - self.local_buffer_ids_by_entry_id.remove(&entry_id); - } - if let Some(entry_id) = new_file.entry_id { - self.local_buffer_ids_by_entry_id - .insert(entry_id, buffer_id); - } - } - - if let Some((client, project_id)) = &self.downstream_client { - client - .send(proto::UpdateBufferFile { + .send(proto::BufferReloaded { project_id: *project_id, - buffer_id: buffer_id.to_proto(), - file: Some(new_file.to_proto(cx)), + buffer_id: buffer.remote_id().to_proto(), + version: serialize_version(&buffer.version()), + mtime: buffer.saved_mtime().map(|t| t.into()), + line_ending: serialize_line_ending(buffer.line_ending()) as i32, }) - .ok(); + .log_err(); } - - buffer.file_updated(Arc::new(new_file), cx); - Some(events) - })?; - - for event in events { - cx.emit(event); + _ => {} } - - None - } - - fn buffer_changed_file(&mut self, buffer: Model, cx: &mut AppContext) -> Option<()> { - let file = File::from_dyn(buffer.read(cx).file())?; - - let remote_id = buffer.read(cx).remote_id(); - if let Some(entry_id) = file.entry_id { - match self.local_buffer_ids_by_entry_id.get(&entry_id) { - Some(_) => { - return None; - } - None => { - self.local_buffer_ids_by_entry_id - .insert(entry_id, remote_id); - } - } - }; - self.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: file.worktree_id(cx), - path: file.path.clone(), - }, - remote_id, - ); - - Some(()) } pub async fn handle_update_buffer( @@ -1186,93 +1539,14 @@ impl BufferStore { capability: Capability, cx: &mut ModelContext, ) -> Result<()> { - match envelope - .payload - .variant - .ok_or_else(|| anyhow!("missing variant"))? - { - proto::create_buffer_for_peer::Variant::State(mut state) => { - let buffer_id = BufferId::new(state.id)?; - - let buffer_result = maybe!({ - let mut buffer_file = None; - if let Some(file) = state.file.take() { - let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id); - let worktree = self - .worktree_store - .read(cx) - .worktree_for_id(worktree_id, cx) - .ok_or_else(|| { - anyhow!("no worktree found for id {}", file.worktree_id) - })?; - buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) - as Arc); - } - Buffer::from_proto(replica_id, capability, state, buffer_file) - }); - - match buffer_result { - Ok(buffer) => { - let buffer = cx.new_model(|_| buffer); - self.loading_remote_buffers_by_id.insert(buffer_id, buffer); - } - Err(error) => { - if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) { - for listener in listeners { - listener.send(Err(anyhow!(error.cloned()))).ok(); - } - } - } - } - } - proto::create_buffer_for_peer::Variant::Chunk(chunk) => { - let buffer_id = BufferId::new(chunk.buffer_id)?; - let buffer = self - .loading_remote_buffers_by_id - .get(&buffer_id) - .cloned() - .ok_or_else(|| { - anyhow!( - "received chunk for buffer {} without initial state", - chunk.buffer_id - ) - })?; - - let result = maybe!({ - let operations = chunk - .operations - .into_iter() - .map(language::proto::deserialize_operation) - .collect::>>()?; - buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx)); - anyhow::Ok(()) - }); + let Some(remote) = self.state.as_remote() else { + return Err(anyhow!("buffer store is not a remote")); + }; - if let Err(error) = result { - self.loading_remote_buffers_by_id.remove(&buffer_id); - if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) { - for listener in listeners { - listener.send(Err(error.cloned())).ok(); - } - } - } else if chunk.is_last { - self.loading_remote_buffers_by_id.remove(&buffer_id); - // retain buffers sent by peers to avoid races. - match &mut self.state { - BufferStoreState::Remote { - ref mut shared_with_me, - upstream_client, - .. - } => { - if upstream_client.is_via_collab() { - shared_with_me.insert(buffer.clone()); - } - } - _ => {} - } - self.add_buffer(buffer, cx)?; - } - } + if let Some(buffer) = remote.update(cx, |remote, cx| { + remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx) + })? { + self.add_buffer(buffer, cx)?; } Ok(()) @@ -1288,7 +1562,7 @@ impl BufferStore { this.update(&mut cx, |this, cx| { let payload = envelope.payload.clone(); - if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; let worktree = this .worktree_store @@ -1313,6 +1587,15 @@ impl BufferStore { cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file }); } } + if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() { + downstream_client + .send(proto::UpdateBufferFile { + project_id: *project_id, + buffer_id: buffer_id.into(), + file: envelope.payload.file, + }) + .log_err(); + } Ok(()) })? } @@ -1325,11 +1608,20 @@ impl BufferStore { this.update(&mut cx, |this, cx| { let buffer_id = envelope.payload.buffer_id; let buffer_id = BufferId::new(buffer_id)?; - if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { buffer.update(cx, |buffer, cx| { - buffer.set_diff_base(envelope.payload.diff_base, cx) + buffer.set_diff_base(envelope.payload.diff_base.clone(), cx) }); } + if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() { + downstream_client + .send(proto::UpdateDiffBase { + project_id: *project_id, + buffer_id: buffer_id.into(), + diff_base: envelope.payload.diff_base, + }) + .log_err(); + } Ok(()) })? } @@ -1408,13 +1700,24 @@ impl BufferStore { ) -> Result<()> { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; let version = deserialize_version(&envelope.payload.version); - let mtime = envelope.payload.mtime.map(|time| time.into()); - this.update(&mut cx, |this, cx| { - if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { + let mtime = envelope.payload.mtime.clone().map(|time| time.into()); + this.update(&mut cx, move |this, cx| { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { buffer.update(cx, |buffer, cx| { buffer.did_save(version, mtime, cx); }); } + + if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() { + downstream_client + .send(proto::BufferSaved { + project_id: *project_id, + buffer_id: buffer_id.into(), + mtime: envelope.payload.mtime, + version: envelope.payload.version, + }) + .log_err(); + } }) } @@ -1425,17 +1728,29 @@ impl BufferStore { ) -> Result<()> { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; let version = deserialize_version(&envelope.payload.version); - let mtime = envelope.payload.mtime.map(|time| time.into()); + let mtime = envelope.payload.mtime.clone().map(|time| time.into()); let line_ending = deserialize_line_ending( proto::LineEnding::from_i32(envelope.payload.line_ending) .ok_or_else(|| anyhow!("missing line ending"))?, ); this.update(&mut cx, |this, cx| { - if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { buffer.update(cx, |buffer, cx| { buffer.did_reload(version, line_ending, mtime, cx); }); } + + if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() { + downstream_client + .send(proto::BufferReloaded { + project_id: *project_id, + buffer_id: buffer_id.into(), + mtime: envelope.payload.mtime, + version: envelope.payload.version, + line_ending: envelope.payload.line_ending, + }) + .log_err(); + } }) } @@ -1480,66 +1795,14 @@ impl BufferStore { push_to_history: bool, cx: &mut ModelContext, ) -> Task> { - let mut local_buffers = Vec::new(); - let mut remote_buffers = Vec::new(); - for buffer_handle in buffers { - let buffer = buffer_handle.read(cx); - if buffer.is_dirty() { - if let Some(file) = File::from_dyn(buffer.file()) { - if file.is_local() { - local_buffers.push(buffer_handle); - } else { - remote_buffers.push(buffer_handle); - } - } - } + let buffers: Vec> = buffers + .into_iter() + .filter(|buffer| buffer.read(cx).is_dirty()) + .collect(); + if buffers.is_empty() { + return Task::ready(Ok(ProjectTransaction::default())); } - - let client = self.upstream_client(); - - cx.spawn(move |this, mut cx| async move { - let mut project_transaction = ProjectTransaction::default(); - if let Some((client, project_id)) = client { - let response = client - .request(proto::ReloadBuffers { - project_id, - buffer_ids: remote_buffers - .iter() - .filter_map(|buffer| { - buffer - .update(&mut cx, |buffer, _| buffer.remote_id().into()) - .ok() - }) - .collect(), - }) - .await? - .transaction - .ok_or_else(|| anyhow!("missing transaction"))?; - BufferStore::deserialize_project_transaction( - this, - response, - push_to_history, - cx.clone(), - ) - .await?; - } - - for buffer in local_buffers { - let transaction = buffer - .update(&mut cx, |buffer, cx| buffer.reload(cx))? - .await?; - buffer.update(&mut cx, |buffer, cx| { - if let Some(transaction) = transaction { - if !push_to_history { - buffer.forget_transaction(transaction.id); - } - project_transaction.0.insert(cx.handle(), transaction); - } - })?; - } - - Ok(project_transaction) - }) + self.state.reload_buffers(buffers, push_to_history, cx) } async fn handle_reload_buffers( @@ -1629,17 +1892,6 @@ impl BufferStore { }) } - pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> { - match &self.state { - BufferStoreState::Remote { - upstream_client, - project_id, - .. - } => Some((upstream_client.clone(), *project_id)), - BufferStoreState::Local { .. } => None, - } - } - pub fn forget_shared_buffers(&mut self) { self.shared_buffers.clear(); } @@ -1658,6 +1910,72 @@ impl BufferStore { &self.shared_buffers } + pub fn create_local_buffer( + &mut self, + text: &str, + language: Option>, + cx: &mut ModelContext, + ) -> Model { + let buffer = cx.new_model(|cx| { + Buffer::local(text, cx) + .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx) + }); + + self.add_buffer(buffer.clone(), cx).log_err(); + let buffer_id = buffer.read(cx).remote_id(); + + let local = self + .state + .as_local() + .expect("local-only method called in a non-local context"); + local.update(cx, |this, cx| { + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + this.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + buffer_id, + ); + + if let Some(entry_id) = file.entry_id { + this.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); + } + } + }); + buffer + } + + pub fn deserialize_project_transaction( + &mut self, + message: proto::ProjectTransaction, + push_to_history: bool, + cx: &mut ModelContext, + ) -> Task> { + if let Some(remote) = self.state.as_remote() { + remote.update(cx, |remote, cx| { + remote.deserialize_project_transaction(message, push_to_history, cx) + }) + } else { + debug_panic!("not a remote buffer store"); + Task::ready(Err(anyhow!("not a remote buffer store"))) + } + } + + pub fn wait_for_remote_buffer( + &self, + id: BufferId, + cx: &mut AppContext, + ) -> Task>> { + if let Some(remote) = self.state.as_remote() { + remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx)) + } else { + debug_panic!("not a remote buffer store"); + Task::ready(Err(anyhow!("not a remote buffer store"))) + } + } + pub fn serialize_project_transaction_for_peer( &mut self, project_transaction: ProjectTransaction, @@ -1680,41 +1998,6 @@ impl BufferStore { } serialized_transaction } - - pub async fn deserialize_project_transaction( - this: WeakModel, - message: proto::ProjectTransaction, - push_to_history: bool, - mut cx: AsyncAppContext, - ) -> Result { - let mut project_transaction = ProjectTransaction::default(); - for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) { - let buffer_id = BufferId::new(buffer_id)?; - let buffer = this - .update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await?; - let transaction = language::proto::deserialize_transaction(transaction)?; - project_transaction.0.insert(buffer, transaction); - } - - for (buffer, transaction) in &project_transaction.0 { - buffer - .update(&mut cx, |buffer, _| { - buffer.wait_for_edits(transaction.edit_ids.iter().copied()) - })? - .await?; - - if push_to_history { - buffer.update(&mut cx, |buffer, _| { - buffer.push_transaction(transaction.clone(), Instant::now()); - })?; - } - } - - Ok(project_transaction) - } } impl OpenBuffer { diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 2b7b10d9b369a9d95a90fde14a571a2adc5c9452..96eb327e8c4344a7dcab8b263216961c282317ef 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -1,10 +1,9 @@ mod signature_help; use crate::{ - buffer_store::BufferStore, lsp_store::LspStore, CodeAction, CoreCompletion, DocumentHighlight, - Hover, HoverBlock, HoverBlockKind, InlayHint, InlayHintLabel, InlayHintLabelPart, - InlayHintLabelPartTooltip, InlayHintTooltip, Location, LocationLink, MarkupContent, - ProjectTransaction, ResolveState, + lsp_store::LspStore, CodeAction, CoreCompletion, DocumentHighlight, Hover, HoverBlock, + HoverBlockKind, InlayHint, InlayHintLabel, InlayHintLabelPart, InlayHintLabelPartTooltip, + InlayHintTooltip, Location, LocationLink, MarkupContent, ProjectTransaction, ResolveState, }; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; @@ -417,18 +416,18 @@ impl LspCommand for PerformRename { message: proto::PerformRenameResponse, lsp_store: Model, _: Model, - cx: AsyncAppContext, + mut cx: AsyncAppContext, ) -> Result { let message = message .transaction .ok_or_else(|| anyhow!("missing transaction"))?; - BufferStore::deserialize_project_transaction( - lsp_store.read_with(&cx, |lsp_store, _| lsp_store.buffer_store().downgrade())?, - message, - self.push_to_history, - cx, - ) - .await + lsp_store + .update(&mut cx, |lsp_store, cx| { + lsp_store.buffer_store().update(cx, |buffer_store, cx| { + buffer_store.deserialize_project_transaction(message, self.push_to_history, cx) + }) + })? + .await } fn buffer_id_from_proto(message: &proto::PerformRename) -> Result { diff --git a/crates/project/src/lsp_store.rs b/crates/project/src/lsp_store.rs index 6c71d4baebf563c0c1f85ad6aaa892c315ef210c..8d859c091bfe93140a7c78a5920956b526fc8efb 100644 --- a/crates/project/src/lsp_store.rs +++ b/crates/project/src/lsp_store.rs @@ -1601,19 +1601,19 @@ impl LspStore { buffer_id: buffer_handle.read(cx).remote_id().into(), action: Some(Self::serialize_code_action(&action)), }; - cx.spawn(move |this, cx| async move { + let buffer_store = self.buffer_store(); + cx.spawn(move |_, mut cx| async move { let response = upstream_client .request(request) .await? .transaction .ok_or_else(|| anyhow!("missing transaction"))?; - BufferStore::deserialize_project_transaction( - this.read_with(&cx, |this, _| this.buffer_store.downgrade())?, - response, - push_to_history, - cx, - ) - .await + + buffer_store + .update(&mut cx, |buffer_store, cx| { + buffer_store.deserialize_project_transaction(response, push_to_history, cx) + })? + .await }) } else { let buffer = buffer_handle.read(cx); @@ -5062,6 +5062,7 @@ impl LspStore { .spawn(this.languages.language_for_name(language_name.0.as_ref())) .detach(); + // host let adapter = this.languages.get_or_register_lsp_adapter( language_name.clone(), server_name.clone(), @@ -5259,7 +5260,8 @@ impl LspStore { result }) } else if let Some((client, project_id)) = self.upstream_client() { - cx.spawn(move |this, mut cx| async move { + let buffer_store = self.buffer_store(); + cx.spawn(move |_, mut cx| async move { let response = client .request(proto::FormatBuffers { project_id, @@ -5274,13 +5276,12 @@ impl LspStore { .await? .transaction .ok_or_else(|| anyhow!("missing transaction"))?; - BufferStore::deserialize_project_transaction( - this.read_with(&cx, |this, _| this.buffer_store.downgrade())?, - response, - push_to_history, - cx, - ) - .await + + buffer_store + .update(&mut cx, |buffer_store, cx| { + buffer_store.deserialize_project_transaction(response, push_to_history, cx) + })? + .await }) } else { Task::ready(Ok(ProjectTransaction::default())) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 454a7586c8856c2c3280f7822d9767dd9cc210a0..fe4d2d6b01545e5cc1dbc2ea86cf69e3c4a097a0 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1667,16 +1667,8 @@ impl Project { } pub fn create_buffer(&mut self, cx: &mut ModelContext) -> Task>> { - self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.create_buffer( - if self.is_via_collab() { - Some((self.client.clone().into(), self.remote_id().unwrap())) - } else { - None - }, - cx, - ) - }) + self.buffer_store + .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx)) } pub fn create_local_buffer( @@ -1685,7 +1677,7 @@ impl Project { language: Option>, cx: &mut ModelContext, ) -> Model { - if self.is_via_collab() { + if self.is_via_collab() || self.is_via_ssh() { panic!("called create_local_buffer on a remote project") } self.buffer_store.update(cx, |buffer_store, cx| { @@ -3770,7 +3762,9 @@ impl Project { envelope: TypedEnvelope, mut cx: AsyncAppContext, ) -> Result { - let buffer = this.update(&mut cx, |this, cx| this.create_local_buffer("", None, cx))?; + let buffer = this + .update(&mut cx, |this, cx| this.create_buffer(cx))? + .await?; let peer_id = envelope.original_sender_id()?; Project::respond_to_open_buffer_request(this, buffer, peer_id, &mut cx) diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index eca65f1349845c083726d85cbdb1bde08421b55a..084fcf9929f0149cb0fe3f44ac3037b4ffea5162 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -56,6 +56,7 @@ async fn test_basic_remote_editing(cx: &mut TestAppContext, server_cx: &mut Test }) .await .unwrap(); + buffer.update(cx, |buffer, cx| { assert_eq!(buffer.text(), "fn one() -> usize { 1 }"); assert_eq!(