From f4d6fbe14f64a63f44bcfba93db5cbaad0b654b0 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 21 Jun 2021 11:49:50 +0200 Subject: [PATCH] Issue only one `OpenFile` and `CloseFile` request per handle --- zed/src/editor/buffer.rs | 6 +- zed/src/rpc.rs | 9 +++ zed/src/workspace.rs | 3 +- zed/src/worktree.rs | 116 +++++++++++++++++++++++++++++---------- 4 files changed, 99 insertions(+), 35 deletions(-) diff --git a/zed/src/editor/buffer.rs b/zed/src/editor/buffer.rs index 1ca98c7298f912e8ecf3d091fa7af7e7d0077b45..776b83dbfca16c61d41bb076ac30eafaa3778fd9 100644 --- a/zed/src/editor/buffer.rs +++ b/zed/src/editor/buffer.rs @@ -32,7 +32,7 @@ use std::{ ops::{Deref, DerefMut, Range}, str, sync::Arc, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + time::{Duration, Instant}, }; const UNDO_GROUP_INTERVAL: Duration = Duration::from_millis(300); @@ -110,7 +110,7 @@ pub struct Buffer { deleted_text: Rope, pub version: time::Global, saved_version: time::Global, - saved_mtime: SystemTime, + saved_mtime: Duration, last_edit: time::Local, undo_map: UndoMap, history: History, @@ -467,7 +467,7 @@ impl Buffer { cx.emit(Event::FileHandleChanged); }); } else { - saved_mtime = UNIX_EPOCH; + saved_mtime = Duration::ZERO; } let mut fragments = SumTree::new(); diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index f4e8f43834297a8bc9510fa3c5a5c953df6f9667..4ffe5290abb2a0b4d73b2930ab8d16b949471d17 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -11,6 +11,7 @@ use std::collections::{HashMap, HashSet}; use std::time::Duration; use std::{convert::TryFrom, future::Future, sync::Arc}; use surf::Url; +use zed_rpc::proto::EnvelopedMessage; use zed_rpc::{proto::RequestMessage, rest, Peer, TypedEnvelope}; use zed_rpc::{PeerId, Receipt}; @@ -191,6 +192,14 @@ impl Client { }) } + pub fn send( + &self, + connection_id: ConnectionId, + message: T, + ) -> impl Future> { + self.peer.send(connection_id, message) + } + pub fn request( &self, connection_id: ConnectionId, diff --git a/zed/src/workspace.rs b/zed/src/workspace.rs index 0db3716a56a6dc2264f60317cb8ebd9d2324d43b..914441e01bb0412ddb1074ae586624ed34dfff2d 100644 --- a/zed/src/workspace.rs +++ b/zed/src/workspace.rs @@ -27,7 +27,6 @@ use std::{ future::Future, path::{Path, PathBuf}, sync::Arc, - time::UNIX_EPOCH, }; use zed_rpc::{proto, TypedEnvelope}; @@ -131,7 +130,7 @@ mod remote { let file = cx.update(|cx| worktree.file(&message.path, cx)).await?; let id = file.id() as u64; - let mtime = file.mtime().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let mtime = file.mtime().as_secs(); *state .shared_files diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 576e5b294a3bd674902b41531237b60427038dcd..3e90db2838433d0449903e4d0d740d678d302d9f 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -19,7 +19,7 @@ use postage::{ prelude::{Sink, Stream}, watch, }; -use smol::channel::Sender; +use smol::{channel::Sender, lock::Mutex as AsyncMutex}; use std::{ cmp, collections::{HashMap, HashSet}, @@ -32,10 +32,10 @@ use std::{ os::unix::{ffi::OsStrExt, fs::MetadataExt}, path::{Path, PathBuf}, sync::{ - atomic::{AtomicUsize, Ordering::SeqCst}, + atomic::{AtomicU64, Ordering::SeqCst}, Arc, Weak, }, - time::{Duration, SystemTime}, + time::{Duration, UNIX_EPOCH}, }; lazy_static! { @@ -135,25 +135,40 @@ pub struct LocalWorktree { snapshot: Snapshot, background_snapshot: Arc>, handles: Arc, Weak>>>>, - next_handle_id: AtomicUsize, + next_handle_id: AtomicU64, scan_state: (watch::Sender, watch::Receiver), _event_stream_handle: fsevent::Handle, poll_scheduled: bool, rpc: Option, } -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct FileHandle { worktree: ModelHandle, state: Arc>, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone)] struct FileHandleState { path: Arc, is_deleted: bool, - mtime: SystemTime, - id: usize, + mtime: Duration, + id: u64, + rpc: Option<(ConnectionId, rpc::Client)>, +} + +impl Drop for FileHandleState { + fn drop(&mut self) { + if let Some((connection_id, rpc)) = self.rpc.take() { + let id = self.id; + smol::spawn(async move { + if let Err(error) = rpc.send(connection_id, proto::CloseFile { id }).await { + log::warn!("error closing file {}: {}", id, error); + } + }) + .detach(); + } + } } impl LocalWorktree { @@ -315,7 +330,7 @@ impl LocalWorktree { if let Some(handle) = handles.lock().get(&*path).and_then(Weak::upgrade) { let mut handle = handle.lock(); - handle.mtime = file.metadata()?.modified()?; + handle.mtime = file.metadata()?.modified()?.duration_since(UNIX_EPOCH)?; handle.is_deleted = false; } @@ -385,7 +400,7 @@ impl fmt::Debug for LocalWorktree { pub struct RemoteWorktree { id: usize, snapshot: Snapshot, - handles: Arc, Weak>>>>, + handles: Arc, Arc>>>>>>, rpc: rpc::Client, connection_id: ConnectionId, } @@ -622,7 +637,7 @@ impl fmt::Debug for Snapshot { } impl FileHandle { - pub fn id(&self) -> usize { + pub fn id(&self) -> u64 { self.state.lock().id } @@ -646,7 +661,7 @@ impl FileHandle { self.state.lock().is_deleted } - pub fn mtime(&self) -> SystemTime { + pub fn mtime(&self) -> Duration { self.state.lock().mtime } @@ -681,7 +696,9 @@ impl FileHandle { cx.observe(&self.worktree, move |observer, worktree, cx| { if let Some(cur_state) = cur_state.upgrade() { let cur_state_unlocked = cur_state.lock(); - if *cur_state_unlocked != prev_state { + if cur_state_unlocked.mtime != prev_state.mtime + || cur_state_unlocked.path != prev_state.path + { prev_state = cur_state_unlocked.clone(); drop(cur_state_unlocked); callback( @@ -1181,11 +1198,19 @@ impl BackgroundScanner { let mut state = state.lock(); if state.path.as_ref() == path { if let Ok(metadata) = &metadata { - state.mtime = metadata.modified().unwrap(); + state.mtime = metadata + .modified() + .unwrap() + .duration_since(UNIX_EPOCH) + .unwrap(); } } else if state.path.starts_with(path) { if let Ok(metadata) = fs::metadata(state.path.as_ref()) { - state.mtime = metadata.modified().unwrap(); + state.mtime = metadata + .modified() + .unwrap() + .duration_since(UNIX_EPOCH) + .unwrap(); } } } @@ -1441,7 +1466,8 @@ impl WorktreeHandle for ModelHandle { .background_executor() .spawn(async move { fs::metadata(&abs_path) }) .await? - .modified()?; + .modified()? + .duration_since(UNIX_EPOCH)?; let state = handle.read_with(&cx, |tree, _| { let mut handles = tree.as_local().unwrap().handles.lock(); handles @@ -1456,18 +1482,20 @@ impl WorktreeHandle for ModelHandle { is_deleted: false, mtime, id, + rpc: None, } } else { FileHandleState { path: path.clone(), - is_deleted: !tree.path_is_pending(path), + is_deleted: !tree.path_is_pending(&path), mtime, id, + rpc: None, } }; let state = Arc::new(Mutex::new(handle_state.clone())); - handles.insert(handle_state.path, Arc::downgrade(&state)); + handles.insert(path, Arc::downgrade(&state)); state }) }); @@ -1481,18 +1509,46 @@ impl WorktreeHandle for ModelHandle { let worktree_id = tree.id; let connection_id = tree.connection_id; let rpc = tree.rpc.clone(); + let handles = tree.handles.clone(); cx.spawn(|cx| async move { - let response = rpc - .request( - connection_id, - proto::OpenFile { - worktree_id: worktree_id as u64, - path: path.to_string_lossy().to_string(), - }, - ) - .await?; - - todo!() + let state = handles + .lock() + .entry(path.clone()) + .or_insert_with(|| Arc::new(AsyncMutex::new(Weak::new()))) + .clone(); + + let mut state = state.lock().await; + if let Some(state) = Weak::upgrade(&state) { + Ok(FileHandle { + worktree: handle, + state, + }) + } else { + let response = rpc + .request( + connection_id, + proto::OpenFile { + worktree_id: worktree_id as u64, + path: path.to_string_lossy().to_string(), + }, + ) + .await?; + let is_deleted = handle.read_with(&cx, |tree, _| { + tree.entry_for_path(&path).is_some() || !tree.path_is_pending(&path) + }); + let new_state = Arc::new(Mutex::new(FileHandleState { + path, + is_deleted, + mtime: Duration::from_secs(response.mtime), + id: response.id, + rpc: Some((connection_id, rpc)), + })); + *state = Arc::downgrade(&new_state); + Ok(FileHandle { + worktree: handle, + state: new_state, + }) + } }) } } @@ -1646,7 +1702,7 @@ mod tests { use std::env; use std::fmt::Write; use std::os::unix; - use std::time::{SystemTime, UNIX_EPOCH}; + use std::time::SystemTime; #[gpui::test] async fn test_populate_and_search(mut cx: gpui::TestAppContext) {