diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index eb56680fb36f283e723a7c9ef738855b55f4bb3d..55b0f413a9ef45183aee26ac1aece75c9151c882 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -29,38 +29,23 @@ use text::BufferId; use util::{debug_panic, maybe, ResultExt as _, TryFutureExt}; use worktree::{File, PathChange, ProjectEntryId, UpdatedGitRepositoriesSet, Worktree, WorktreeId}; -trait BufferStoreImpl { - fn open_buffer( - &self, - path: Arc, - worktree: Model, - cx: &mut ModelContext, - ) -> Task>>; - - fn save_buffer( - &self, - buffer: Model, - cx: &mut ModelContext, - ) -> Task>; - - fn save_buffer_as( - &self, - buffer: Model, - path: ProjectPath, - cx: &mut ModelContext, - ) -> Task>; - - fn create_buffer(&self, cx: &mut ModelContext) -> Task>>; - - fn reload_buffers( - &self, - buffers: HashSet>, - push_to_history: bool, - cx: &mut ModelContext, - ) -> Task>; +/// A set of open buffers. +pub struct BufferStore { + state: BufferStoreState, + #[allow(clippy::type_complexity)] + loading_buffers_by_path: HashMap< + ProjectPath, + postage::watch::Receiver, Arc>>>, + >, + worktree_store: Model, + opened_buffers: HashMap, + downstream_client: Option<(AnyProtoClient, u64)>, + shared_buffers: HashMap>>, +} - fn as_remote(&self) -> Option>; - fn as_local(&self) -> Option>; +enum BufferStoreState { + Local(LocalBufferStore), + Remote(RemoteBufferStore), } struct RemoteBufferStore { @@ -71,31 +56,15 @@ struct RemoteBufferStore { remote_buffer_listeners: HashMap, anyhow::Error>>>>, worktree_store: Model, - buffer_store: WeakModel, } struct LocalBufferStore { local_buffer_ids_by_path: HashMap, local_buffer_ids_by_entry_id: HashMap, - buffer_store: WeakModel, worktree_store: Model, _subscription: Subscription, } -/// A set of open buffers. -pub struct BufferStore { - state: Box, - #[allow(clippy::type_complexity)] - loading_buffers_by_path: HashMap< - ProjectPath, - postage::watch::Receiver, Arc>>>, - >, - worktree_store: Model, - opened_buffers: HashMap, - downstream_client: Option<(AnyProtoClient, u64)>, - shared_buffers: HashMap>>, -} - enum OpenBuffer { Buffer(WeakModel), Operations(Vec), @@ -119,14 +88,13 @@ impl RemoteBufferStore { pub fn wait_for_remote_buffer( &mut self, id: BufferId, - cx: &mut AppContext, + cx: &mut ModelContext, ) -> Task>> { - let buffer_store = self.buffer_store.clone(); let (tx, rx) = oneshot::channel(); self.remote_buffer_listeners.entry(id).or_default().push(tx); - cx.spawn(|cx| async move { - if let Some(buffer) = buffer_store + cx.spawn(|this, cx| async move { + if let Some(buffer) = this .read_with(&cx, |buffer_store, _| buffer_store.get(id)) .ok() .flatten() @@ -144,7 +112,7 @@ impl RemoteBufferStore { &self, buffer_handle: Model, new_path: Option, - cx: &ModelContext, + cx: &ModelContext, ) -> Task> { let buffer = buffer_handle.read(cx); let buffer_id = buffer.remote_id().into(); @@ -176,7 +144,7 @@ impl RemoteBufferStore { envelope: TypedEnvelope, replica_id: u16, capability: Capability, - cx: &mut ModelContext, + cx: &mut ModelContext, ) -> Result>> { match envelope .payload @@ -277,7 +245,7 @@ impl RemoteBufferStore { &self, message: proto::ProjectTransaction, push_to_history: bool, - cx: &mut ModelContext, + cx: &mut ModelContext, ) -> Task> { cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); @@ -310,36 +278,6 @@ impl RemoteBufferStore { Ok(project_transaction) }) } -} - -impl BufferStoreImpl for Model { - fn as_remote(&self) -> Option> { - Some(self.clone()) - } - - fn as_local(&self) -> Option> { - None - } - - fn save_buffer( - &self, - buffer: Model, - cx: &mut ModelContext, - ) -> Task> { - self.update(cx, |this, cx| { - this.save_remote_buffer(buffer.clone(), None, cx) - }) - } - fn save_buffer_as( - &self, - buffer: Model, - path: ProjectPath, - cx: &mut ModelContext, - ) -> Task> { - self.update(cx, |this, cx| { - this.save_remote_buffer(buffer, Some(path.to_proto()), cx) - }) - } fn open_buffer( &self, @@ -347,46 +285,42 @@ impl BufferStoreImpl for Model { worktree: Model, cx: &mut ModelContext, ) -> Task>> { - self.update(cx, |this, cx| { - let worktree_id = worktree.read(cx).id().to_proto(); - let project_id = this.project_id; - let client = this.upstream_client.clone(); - let path_string = path.clone().to_string_lossy().to_string(); - cx.spawn(move |this, mut cx| async move { - let response = client - .request(proto::OpenBufferByPath { - project_id, - worktree_id, - path: path_string, - }) - .await?; - let buffer_id = BufferId::new(response.buffer_id)?; + let worktree_id = worktree.read(cx).id().to_proto(); + let project_id = self.project_id; + let client = self.upstream_client.clone(); + let path_string = path.clone().to_string_lossy().to_string(); + cx.spawn(move |this, mut cx| async move { + let response = client + .request(proto::OpenBufferByPath { + project_id, + worktree_id, + path: path_string, + }) + .await?; + let buffer_id = BufferId::new(response.buffer_id)?; - let buffer = this - .update(&mut cx, { - |this, cx| this.wait_for_remote_buffer(buffer_id, cx) - })? - .await?; + let buffer = this + .update(&mut cx, { + |this, cx| this.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; - Ok(buffer) - }) + Ok(buffer) }) } fn create_buffer(&self, cx: &mut ModelContext) -> Task>> { - self.update(cx, |this, cx| { - let create = this.upstream_client.request(proto::OpenNewBuffer { - project_id: this.project_id, - }); - cx.spawn(|this, mut cx| async move { - let response = create.await?; - let buffer_id = BufferId::new(response.buffer_id)?; + let create = self.upstream_client.request(proto::OpenNewBuffer { + project_id: self.project_id, + }); + cx.spawn(|this, mut cx| async move { + let response = create.await?; + let buffer_id = BufferId::new(response.buffer_id)?; - this.update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await - }) + this.update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + })? + .await }) } @@ -396,25 +330,23 @@ impl BufferStoreImpl for Model { push_to_history: bool, cx: &mut ModelContext, ) -> Task> { - self.update(cx, |this, cx| { - let request = this.upstream_client.request(proto::ReloadBuffers { - project_id: this.project_id, - buffer_ids: buffers - .iter() - .map(|buffer| buffer.read(cx).remote_id().to_proto()) - .collect(), - }); + let request = self.upstream_client.request(proto::ReloadBuffers { + project_id: self.project_id, + buffer_ids: buffers + .iter() + .map(|buffer| buffer.read(cx).remote_id().to_proto()) + .collect(), + }); - cx.spawn(|this, mut cx| async move { - let response = request - .await? - .transaction - .ok_or_else(|| anyhow!("missing transaction"))?; - this.update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) - })? - .await - }) + cx.spawn(|this, mut cx| async move { + let response = request + .await? + .transaction + .ok_or_else(|| anyhow!("missing transaction"))?; + this.update(&mut cx, |this, cx| { + this.deserialize_project_transaction(response, push_to_history, cx) + })? + .await }) } } @@ -426,7 +358,7 @@ impl LocalBufferStore { worktree: Model, path: Arc, mut has_changed_file: bool, - cx: &mut ModelContext, + cx: &mut ModelContext, ) -> Task> { let buffer = buffer_handle.read(cx); @@ -449,7 +381,7 @@ impl LocalBufferStore { let new_file = save.await?; let mtime = new_file.disk_state().mtime(); this.update(&mut cx, |this, cx| { - if let Some((downstream_client, project_id)) = this.downstream_client(cx) { + if let Some((downstream_client, project_id)) = this.downstream_client.clone() { if has_changed_file { downstream_client .send(proto::UpdateBufferFile { @@ -478,15 +410,24 @@ impl LocalBufferStore { }) } - fn subscribe_to_worktree(&mut self, worktree: &Model, cx: &mut ModelContext) { + fn subscribe_to_worktree( + &mut self, + worktree: &Model, + cx: &mut ModelContext, + ) { cx.subscribe(worktree, |this, worktree, event, cx| { if worktree.read(cx).is_local() { match event { worktree::Event::UpdatedEntries(changes) => { - this.local_worktree_entries_changed(&worktree, changes, cx); + Self::local_worktree_entries_changed(this, &worktree, changes, cx); } worktree::Event::UpdatedGitRepositories(updated_repos) => { - this.local_worktree_git_repos_changed(worktree.clone(), updated_repos, cx) + Self::local_worktree_git_repos_changed( + this, + worktree.clone(), + updated_repos, + cx, + ) } _ => {} } @@ -496,66 +437,67 @@ impl LocalBufferStore { } fn local_worktree_entries_changed( - &mut self, + this: &mut BufferStore, worktree_handle: &Model, changes: &[(Arc, ProjectEntryId, PathChange)], - cx: &mut ModelContext, + cx: &mut ModelContext, ) { let snapshot = worktree_handle.read(cx).snapshot(); for (path, entry_id, _) in changes { - self.local_worktree_entry_changed(*entry_id, path, worktree_handle, &snapshot, cx); + Self::local_worktree_entry_changed( + this, + *entry_id, + path, + worktree_handle, + &snapshot, + cx, + ); } } fn local_worktree_git_repos_changed( - &mut self, + this: &mut BufferStore, worktree_handle: Model, changed_repos: &UpdatedGitRepositoriesSet, - cx: &mut ModelContext, + cx: &mut ModelContext, ) { debug_assert!(worktree_handle.read(cx).is_local()); - let Some(buffer_store) = self.buffer_store.upgrade() else { - return; - }; // Identify the loading buffers whose containing repository that has changed. - let (future_buffers, current_buffers) = buffer_store.update(cx, |buffer_store, cx| { - let future_buffers = buffer_store - .loading_buffers() - .filter_map(|(project_path, receiver)| { - if project_path.worktree_id != worktree_handle.read(cx).id() { - return None; - } - let path = &project_path.path; - changed_repos - .iter() - .find(|(work_dir, _)| path.starts_with(work_dir))?; - let path = path.clone(); - Some(async move { - BufferStore::wait_for_loading_buffer(receiver) - .await - .ok() - .map(|buffer| (buffer, path)) - }) - }) - .collect::>(); - - // Identify the current buffers whose containing repository has changed. - let current_buffers = buffer_store - .buffers() - .filter_map(|buffer| { - let file = File::from_dyn(buffer.read(cx).file())?; - if file.worktree != worktree_handle { - return None; - } - changed_repos - .iter() - .find(|(work_dir, _)| file.path.starts_with(work_dir))?; - Some((buffer, file.path.clone())) + let future_buffers = this + .loading_buffers() + .filter_map(|(project_path, receiver)| { + if project_path.worktree_id != worktree_handle.read(cx).id() { + return None; + } + let path = &project_path.path; + changed_repos + .iter() + .find(|(work_dir, _)| path.starts_with(work_dir))?; + let path = path.clone(); + Some(async move { + BufferStore::wait_for_loading_buffer(receiver) + .await + .ok() + .map(|buffer| (buffer, path)) }) - .collect::>(); - (future_buffers, current_buffers) - }); + }) + .collect::>(); + + // Identify the current buffers whose containing repository has changed. + let current_buffers = this + .buffers() + .filter_map(|buffer| { + let file = File::from_dyn(buffer.read(cx).file())?; + if file.worktree != worktree_handle { + return None; + } + changed_repos + .iter() + .find(|(work_dir, _)| file.path.starts_with(work_dir))?; + Some((buffer, file.path.clone())) + }) + .collect::>(); if future_buffers.len() + current_buffers.len() == 0 { return; @@ -603,7 +545,7 @@ impl LocalBufferStore { buffer.set_diff_base(diff_base.clone(), cx); buffer.remote_id().to_proto() }); - if let Some((client, project_id)) = &this.downstream_client(cx) { + if let Some((client, project_id)) = &this.downstream_client.clone() { client .send(proto::UpdateDiffBase { project_id: *project_id, @@ -619,42 +561,44 @@ impl LocalBufferStore { } fn local_worktree_entry_changed( - &mut self, + this: &mut BufferStore, entry_id: ProjectEntryId, path: &Arc, worktree: &Model, snapshot: &worktree::Snapshot, - cx: &mut ModelContext, + cx: &mut ModelContext, ) -> Option<()> { let project_path = ProjectPath { worktree_id: snapshot.id(), path: path.clone(), }; - let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) { - Some(&buffer_id) => buffer_id, - None => self.local_buffer_ids_by_path.get(&project_path).copied()?, + + let buffer_id = { + let local = this.as_local_mut()?; + match local.local_buffer_ids_by_entry_id.get(&entry_id) { + Some(&buffer_id) => buffer_id, + None => local.local_buffer_ids_by_path.get(&project_path).copied()?, + } }; - let buffer = self - .buffer_store - .update(cx, |buffer_store, _| { - if let Some(buffer) = buffer_store.get(buffer_id) { - Some(buffer) - } else { - buffer_store.opened_buffers.remove(&buffer_id); - None - } - }) - .ok() - .flatten(); + + let buffer = if let Some(buffer) = this.get(buffer_id) { + Some(buffer) + } else { + this.opened_buffers.remove(&buffer_id); + None + }; + let buffer = if let Some(buffer) = buffer { buffer } else { - self.local_buffer_ids_by_path.remove(&project_path); - self.local_buffer_ids_by_entry_id.remove(&entry_id); + let this = this.as_local_mut()?; + this.local_buffer_ids_by_path.remove(&project_path); + this.local_buffer_ids_by_entry_id.remove(&entry_id); return None; }; let events = buffer.update(cx, |buffer, cx| { + let local = this.as_local_mut()?; let file = buffer.file()?; let old_file = File::from_dyn(Some(file))?; if old_file.worktree != *worktree { @@ -695,11 +639,11 @@ impl LocalBufferStore { let mut events = Vec::new(); if new_file.path != old_file.path { - self.local_buffer_ids_by_path.remove(&ProjectPath { + local.local_buffer_ids_by_path.remove(&ProjectPath { path: old_file.path.clone(), worktree_id: old_file.worktree_id(cx), }); - self.local_buffer_ids_by_path.insert( + local.local_buffer_ids_by_path.insert( ProjectPath { worktree_id: new_file.worktree_id(cx), path: new_file.path.clone(), @@ -714,15 +658,16 @@ impl LocalBufferStore { if new_file.entry_id != old_file.entry_id { if let Some(entry_id) = old_file.entry_id { - self.local_buffer_ids_by_entry_id.remove(&entry_id); + local.local_buffer_ids_by_entry_id.remove(&entry_id); } if let Some(entry_id) = new_file.entry_id { - self.local_buffer_ids_by_entry_id + local + .local_buffer_ids_by_entry_id .insert(entry_id, buffer_id); } } - if let Some((client, project_id)) = &self.downstream_client(cx) { + if let Some((client, project_id)) = &this.downstream_client { client .send(proto::UpdateBufferFile { project_id: *project_id, @@ -735,23 +680,12 @@ impl LocalBufferStore { buffer.file_updated(Arc::new(new_file), cx); Some(events) })?; - self.buffer_store - .update(cx, |_buffer_store, cx| { - for event in events { - cx.emit(event); - } - }) - .log_err()?; - None - } + for event in events { + cx.emit(event); + } - fn downstream_client(&self, cx: &AppContext) -> Option<(AnyProtoClient, u64)> { - self.buffer_store - .upgrade()? - .read(cx) - .downstream_client - .clone() + None } fn buffer_changed_file(&mut self, buffer: Model, cx: &mut AppContext) -> Option<()> { @@ -779,29 +713,17 @@ impl LocalBufferStore { Some(()) } -} - -impl BufferStoreImpl for Model { - fn as_remote(&self) -> Option> { - None - } - - fn as_local(&self) -> Option> { - Some(self.clone()) - } fn save_buffer( &self, buffer: Model, cx: &mut ModelContext, ) -> Task> { - self.update(cx, |this, cx| { - let Some(file) = File::from_dyn(buffer.read(cx).file()) else { - return Task::ready(Err(anyhow!("buffer doesn't have a file"))); - }; - let worktree = file.worktree.clone(); - this.save_local_buffer(buffer, worktree, file.path.clone(), false, cx) - }) + let Some(file) = File::from_dyn(buffer.read(cx).file()) else { + return Task::ready(Err(anyhow!("buffer doesn't have a file"))); + }; + let worktree = file.worktree.clone(); + self.save_local_buffer(buffer, worktree, file.path.clone(), false, cx) } fn save_buffer_as( @@ -810,16 +732,14 @@ impl BufferStoreImpl for Model { path: ProjectPath, cx: &mut ModelContext, ) -> Task> { - self.update(cx, |this, cx| { - let Some(worktree) = this - .worktree_store - .read(cx) - .worktree_for_id(path.worktree_id, cx) - else { - return Task::ready(Err(anyhow!("no such worktree"))); - }; - this.save_local_buffer(buffer, worktree, path.path.clone(), true, cx) - }) + let Some(worktree) = self + .worktree_store + .read(cx) + .worktree_for_id(path.worktree_id, cx) + else { + return Task::ready(Err(anyhow!("no such worktree"))); + }; + self.save_local_buffer(buffer, worktree, path.path.clone(), true, cx) } fn open_buffer( @@ -828,76 +748,72 @@ impl BufferStoreImpl for Model { worktree: Model, cx: &mut ModelContext, ) -> Task>> { - let buffer_store = cx.weak_model(); - self.update(cx, |_, cx| { - let load_buffer = worktree.update(cx, |worktree, cx| { - let load_file = worktree.load_file(path.as_ref(), cx); - let reservation = cx.reserve_model(); - let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64()); - cx.spawn(move |_, mut cx| async move { - let loaded = load_file.await?; - let text_buffer = cx - .background_executor() - .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) - .await; - cx.insert_model(reservation, |_| { - Buffer::build( - text_buffer, - loaded.diff_base, - Some(loaded.file), - Capability::ReadWrite, - ) - }) + let load_buffer = worktree.update(cx, |worktree, cx| { + let load_file = worktree.load_file(path.as_ref(), cx); + let reservation = cx.reserve_model(); + let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64()); + cx.spawn(move |_, mut cx| async move { + let loaded = load_file.await?; + let text_buffer = cx + .background_executor() + .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) + .await; + cx.insert_model(reservation, |_| { + Buffer::build( + text_buffer, + loaded.diff_base, + Some(loaded.file), + Capability::ReadWrite, + ) }) - }); + }) + }); - cx.spawn(move |this, mut cx| async move { - let buffer = match load_buffer.await { - Ok(buffer) => Ok(buffer), - Err(error) if is_not_found_error(&error) => cx.new_model(|cx| { - let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64()); - let text_buffer = text::Buffer::new(0, buffer_id, "".into()); - Buffer::build( - text_buffer, - None, - Some(Arc::new(File { - worktree, - path, - disk_state: DiskState::New, - entry_id: None, - is_local: true, - is_private: false, - })), - Capability::ReadWrite, - ) - }), - Err(e) => Err(e), - }?; - this.update(&mut cx, |this, cx| { - buffer_store.update(cx, |buffer_store, cx| { - buffer_store.add_buffer(buffer.clone(), cx) - })??; - let buffer_id = buffer.read(cx).remote_id(); - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - this.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: file.worktree_id(cx), - path: file.path.clone(), - }, - buffer_id, - ); - - if let Some(entry_id) = file.entry_id { - this.local_buffer_ids_by_entry_id - .insert(entry_id, buffer_id); - } + cx.spawn(move |this, mut cx| async move { + let buffer = match load_buffer.await { + Ok(buffer) => Ok(buffer), + Err(error) if is_not_found_error(&error) => cx.new_model(|cx| { + let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64()); + let text_buffer = text::Buffer::new(0, buffer_id, "".into()); + Buffer::build( + text_buffer, + None, + Some(Arc::new(File { + worktree, + path, + disk_state: DiskState::New, + entry_id: None, + is_local: true, + is_private: false, + })), + Capability::ReadWrite, + ) + }), + Err(e) => Err(e), + }?; + this.update(&mut cx, |this, cx| { + this.add_buffer(buffer.clone(), cx)?; + let buffer_id = buffer.read(cx).remote_id(); + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + let this = this.as_local_mut().unwrap(); + this.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + buffer_id, + ); + + if let Some(entry_id) = file.entry_id { + this.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); } + } - anyhow::Ok(()) - })??; + anyhow::Ok(()) + })??; - Ok(buffer) - }) + Ok(buffer) }) } @@ -954,26 +870,18 @@ impl BufferStore { /// Creates a buffer store, optionally retaining its buffers. pub fn local(worktree_store: Model, cx: &mut ModelContext) -> Self { - let this = cx.weak_model(); Self { - state: Box::new(cx.new_model(|cx| { - let subscription = cx.subscribe( - &worktree_store, - |this: &mut LocalBufferStore, _, event, cx| { - if let WorktreeStoreEvent::WorktreeAdded(worktree) = event { - this.subscribe_to_worktree(worktree, cx); - } - }, - ); - - LocalBufferStore { - local_buffer_ids_by_path: Default::default(), - local_buffer_ids_by_entry_id: Default::default(), - buffer_store: this, - worktree_store: worktree_store.clone(), - _subscription: subscription, - } - })), + state: BufferStoreState::Local(LocalBufferStore { + local_buffer_ids_by_path: Default::default(), + local_buffer_ids_by_entry_id: Default::default(), + worktree_store: worktree_store.clone(), + _subscription: cx.subscribe(&worktree_store, |this, _, event, cx| { + if let WorktreeStoreEvent::WorktreeAdded(worktree) = event { + let this = this.as_local_mut().unwrap(); + this.subscribe_to_worktree(worktree, cx); + } + }), + }), downstream_client: None, opened_buffers: Default::default(), shared_buffers: Default::default(), @@ -986,19 +894,17 @@ impl BufferStore { worktree_store: Model, upstream_client: AnyProtoClient, remote_id: u64, - cx: &mut ModelContext, + _cx: &mut ModelContext, ) -> Self { - let this = cx.weak_model(); Self { - state: Box::new(cx.new_model(|_| RemoteBufferStore { + state: BufferStoreState::Remote(RemoteBufferStore { shared_with_me: Default::default(), loading_remote_buffers_by_id: Default::default(), remote_buffer_listeners: Default::default(), project_id: remote_id, upstream_client, worktree_store: worktree_store.clone(), - buffer_store: this, - })), + }), downstream_client: None, opened_buffers: Default::default(), loading_buffers_by_path: Default::default(), @@ -1007,6 +913,27 @@ impl BufferStore { } } + fn as_local_mut(&mut self) -> Option<&mut LocalBufferStore> { + match &mut self.state { + BufferStoreState::Local(state) => Some(state), + _ => None, + } + } + + fn as_remote_mut(&mut self) -> Option<&mut RemoteBufferStore> { + match &mut self.state { + BufferStoreState::Remote(state) => Some(state), + _ => None, + } + } + + fn as_remote(&self) -> Option<&RemoteBufferStore> { + match &self.state { + BufferStoreState::Remote(state) => Some(state), + _ => None, + } + } + pub fn open_buffer( &mut self, project_path: ProjectPath, @@ -1035,10 +962,11 @@ impl BufferStore { let (mut tx, rx) = postage::watch::channel(); entry.insert(rx.clone()); - let project_path = project_path.clone(); - let load_buffer = self - .state - .open_buffer(project_path.path.clone(), worktree, cx); + let path = project_path.path.clone(); + let load_buffer = match &self.state { + BufferStoreState::Local(this) => this.open_buffer(path, worktree, cx), + BufferStoreState::Remote(this) => this.open_buffer(path, worktree, cx), + }; cx.spawn(move |this, mut cx| async move { let load_result = load_buffer.await; @@ -1063,7 +991,10 @@ impl BufferStore { } pub fn create_buffer(&mut self, cx: &mut ModelContext) -> Task>> { - self.state.create_buffer(cx) + match &self.state { + BufferStoreState::Local(this) => this.create_buffer(cx), + BufferStoreState::Remote(this) => this.create_buffer(cx), + } } pub fn save_buffer( @@ -1071,7 +1002,10 @@ impl BufferStore { buffer: Model, cx: &mut ModelContext, ) -> Task> { - self.state.save_buffer(buffer, cx) + match &mut self.state { + BufferStoreState::Local(this) => this.save_buffer(buffer, cx), + BufferStoreState::Remote(this) => this.save_remote_buffer(buffer.clone(), None, cx), + } } pub fn save_buffer_as( @@ -1081,7 +1015,12 @@ impl BufferStore { cx: &mut ModelContext, ) -> Task> { let old_file = buffer.read(cx).file().cloned(); - let task = self.state.save_buffer_as(buffer.clone(), path, cx); + let task = match &self.state { + BufferStoreState::Local(this) => this.save_buffer_as(buffer.clone(), path, cx), + BufferStoreState::Remote(this) => { + this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx) + } + }; cx.spawn(|this, mut cx| async move { task.await?; this.update(&mut cx, |_, cx| { @@ -1306,19 +1245,10 @@ impl BufferStore { .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) } - pub fn get_possibly_incomplete( - &self, - buffer_id: BufferId, - cx: &AppContext, - ) -> Option> { + pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option> { self.get(buffer_id).or_else(|| { - self.state.as_remote().and_then(|remote| { - remote - .read(cx) - .loading_remote_buffers_by_id - .get(&buffer_id) - .cloned() - }) + self.as_remote() + .and_then(|remote| remote.loading_remote_buffers_by_id.get(&buffer_id).cloned()) }) } @@ -1337,9 +1267,8 @@ impl BufferStore { }) .collect(); let incomplete_buffer_ids = self - .state .as_remote() - .map(|remote| remote.read(cx).incomplete_buffer_ids()) + .map(|remote| remote.incomplete_buffer_ids()) .unwrap_or_default(); (buffers, incomplete_buffer_ids) } @@ -1357,12 +1286,10 @@ impl BufferStore { }); } - if let Some(remote) = self.state.as_remote() { - remote.update(cx, |remote, _| { - // Wake up all futures currently waiting on a buffer to get opened, - // to give them a chance to fail now that we've disconnected. - remote.remote_buffer_listeners.clear() - }) + if let Some(remote) = self.as_remote_mut() { + // Wake up all futures currently waiting on a buffer to get opened, + // to give them a chance to fail now that we've disconnected. + remote.remote_buffer_listeners.clear() } } @@ -1447,10 +1374,8 @@ impl BufferStore { ) { match event { BufferEvent::FileHandleChanged => { - if let Some(local) = self.state.as_local() { - local.update(cx, |local, cx| { - local.buffer_changed_file(buffer, cx); - }) + if let Some(local) = self.as_local_mut() { + local.buffer_changed_file(buffer, cx); } } BufferEvent::Reloaded => { @@ -1593,13 +1518,13 @@ impl BufferStore { capability: Capability, cx: &mut ModelContext, ) -> Result<()> { - let Some(remote) = self.state.as_remote() else { + let Some(remote) = self.as_remote_mut() else { return Err(anyhow!("buffer store is not a remote")); }; - if let Some(buffer) = remote.update(cx, |remote, cx| { - remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx) - })? { + if let Some(buffer) = + remote.handle_create_buffer_for_peer(envelope, replica_id, capability, cx)? + { self.add_buffer(buffer, cx)?; } @@ -1616,7 +1541,7 @@ impl BufferStore { this.update(&mut cx, |this, cx| { let payload = envelope.payload.clone(); - if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; let worktree = this .worktree_store @@ -1662,7 +1587,7 @@ impl BufferStore { this.update(&mut cx, |this, cx| { let buffer_id = envelope.payload.buffer_id; let buffer_id = BufferId::new(buffer_id)?; - if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { buffer.update(cx, |buffer, cx| { buffer.set_diff_base(envelope.payload.diff_base.clone(), cx) }); @@ -1756,7 +1681,7 @@ impl BufferStore { let version = deserialize_version(&envelope.payload.version); let mtime = envelope.payload.mtime.clone().map(|time| time.into()); this.update(&mut cx, move |this, cx| { - if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { buffer.update(cx, |buffer, cx| { buffer.did_save(version, mtime, cx); }); @@ -1788,7 +1713,7 @@ impl BufferStore { .ok_or_else(|| anyhow!("missing line ending"))?, ); this.update(&mut cx, |this, cx| { - if let Some(buffer) = this.get_possibly_incomplete(buffer_id, cx) { + if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { buffer.update(cx, |buffer, cx| { buffer.did_reload(version, line_ending, mtime, cx); }); @@ -1877,8 +1802,10 @@ impl BufferStore { if buffers.is_empty() { return Task::ready(Ok(ProjectTransaction::default())); } - - self.state.reload_buffers(buffers, push_to_history, cx) + match &self.state { + BufferStoreState::Local(this) => this.reload_buffers(buffers, push_to_history, cx), + BufferStoreState::Remote(this) => this.reload_buffers(buffers, push_to_history, cx), + } } async fn handle_reload_buffers( @@ -2000,26 +1927,23 @@ impl BufferStore { self.add_buffer(buffer.clone(), cx).log_err(); let buffer_id = buffer.read(cx).remote_id(); - let local = self - .state - .as_local() + let this = self + .as_local_mut() .expect("local-only method called in a non-local context"); - local.update(cx, |this, cx| { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - this.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: file.worktree_id(cx), - path: file.path.clone(), - }, - buffer_id, - ); + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + this.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + buffer_id, + ); - if let Some(entry_id) = file.entry_id { - this.local_buffer_ids_by_entry_id - .insert(entry_id, buffer_id); - } + if let Some(entry_id) = file.entry_id { + this.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); } - }); + } buffer } @@ -2029,10 +1953,8 @@ impl BufferStore { push_to_history: bool, cx: &mut ModelContext, ) -> Task> { - if let Some(remote) = self.state.as_remote() { - remote.update(cx, |remote, cx| { - remote.deserialize_project_transaction(message, push_to_history, cx) - }) + if let Some(this) = self.as_remote_mut() { + this.deserialize_project_transaction(message, push_to_history, cx) } else { debug_panic!("not a remote buffer store"); Task::ready(Err(anyhow!("not a remote buffer store"))) @@ -2040,12 +1962,12 @@ impl BufferStore { } pub fn wait_for_remote_buffer( - &self, + &mut self, id: BufferId, - cx: &mut AppContext, + cx: &mut ModelContext, ) -> Task>> { - if let Some(remote) = self.state.as_remote() { - remote.update(cx, |remote, cx| remote.wait_for_remote_buffer(id, cx)) + if let Some(this) = self.as_remote_mut() { + this.wait_for_remote_buffer(id, cx) } else { debug_panic!("not a remote buffer store"); Task::ready(Err(anyhow!("not a remote buffer store")))