Remove shared_worktrees map from ClientState

Nathan Sobo created

Each worktree instance now handles its own messages.

Change summary

zed/src/main.rs     |   8 -
zed/src/rpc.rs      |  25 -----
zed/src/worktree.rs | 196 ----------------------------------------------
3 files changed, 4 insertions(+), 225 deletions(-)

Detailed changes

zed/src/main.rs 🔗

@@ -11,7 +11,6 @@ use zed::{
     fs::RealFs,
     language, menus, rpc, settings, theme_selector,
     workspace::{self, OpenParams},
-    worktree::{self},
     AppState,
 };
 use zrpc::ForegroundRouter;
@@ -27,7 +26,7 @@ fn main() {
     let languages = Arc::new(language::LanguageRegistry::new());
     languages.set_theme(&settings.borrow().theme);
 
-    let mut app_state = AppState {
+    let app_state = AppState {
         languages: languages.clone(),
         settings_tx: Arc::new(Mutex::new(settings_tx)),
         settings,
@@ -38,11 +37,6 @@ fn main() {
     };
 
     app.run(move |cx| {
-        worktree::init(
-            cx,
-            &app_state.rpc,
-            Arc::get_mut(&mut app_state.rpc_router).unwrap(),
-        );
         let app_state = Arc::new(app_state);
 
         zed::init(cx);

zed/src/rpc.rs 🔗

@@ -1,12 +1,11 @@
-use crate::{language::LanguageRegistry, worktree::Worktree};
+use crate::language::LanguageRegistry;
 use anyhow::{anyhow, Context, Result};
 use async_tungstenite::tungstenite::http::Request;
 use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
 use futures::StreamExt;
-use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle};
+use gpui::{AsyncAppContext, Entity, ModelContext, Task};
 use lazy_static::lazy_static;
 use smol::lock::RwLock;
-use std::collections::HashMap;
 use std::time::Duration;
 use std::{convert::TryFrom, future::Future, sync::Arc};
 use surf::Url;
@@ -30,35 +29,15 @@ pub struct Client {
 
 pub struct ClientState {
     connection_id: Option<ConnectionId>,
-    pub shared_worktrees: HashMap<u64, WeakModelHandle<Worktree>>,
     pub languages: Arc<LanguageRegistry>,
 }
 
-impl ClientState {
-    pub fn shared_worktree(
-        &self,
-        id: u64,
-        cx: &mut AsyncAppContext,
-    ) -> Result<ModelHandle<Worktree>> {
-        if let Some(worktree) = self.shared_worktrees.get(&id) {
-            if let Some(worktree) = cx.read(|cx| worktree.upgrade(cx)) {
-                Ok(worktree)
-            } else {
-                Err(anyhow!("worktree {} was dropped", id))
-            }
-        } else {
-            Err(anyhow!("worktree {} does not exist", id))
-        }
-    }
-}
-
 impl Client {
     pub fn new(languages: Arc<LanguageRegistry>) -> Self {
         Self {
             peer: Peer::new(),
             state: Arc::new(RwLock::new(ClientState {
                 connection_id: None,
-                shared_worktrees: Default::default(),
                 languages,
             })),
         }

zed/src/worktree.rs 🔗

@@ -42,23 +42,12 @@ use std::{
     },
     time::{Duration, SystemTime},
 };
-use zrpc::{ForegroundRouter, PeerId, TypedEnvelope};
+use zrpc::{PeerId, TypedEnvelope};
 
 lazy_static! {
     static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
 }
 
-pub fn init(cx: &mut MutableAppContext, rpc: &rpc::Client, router: &mut ForegroundRouter) {
-    rpc.on_message(router, remote::add_peer, cx);
-    rpc.on_message(router, remote::remove_peer, cx);
-    rpc.on_message(router, remote::update_worktree, cx);
-    rpc.on_message(router, remote::open_buffer, cx);
-    rpc.on_message(router, remote::close_buffer, cx);
-    rpc.on_message(router, remote::update_buffer, cx);
-    rpc.on_message(router, remote::buffer_saved, cx);
-    rpc.on_message(router, remote::save_buffer, cx);
-}
-
 #[derive(Clone, Debug)]
 enum ScanState {
     Idle,
@@ -85,11 +74,6 @@ impl Entity for Worktree {
 
         if let Some((rpc, worktree_id)) = rpc {
             cx.spawn(|_| async move {
-                rpc.state
-                    .write()
-                    .await
-                    .shared_worktrees
-                    .remove(&worktree_id);
                 if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await {
                     log::error!("error closing worktree {}: {}", worktree_id, err);
                 }
@@ -254,11 +238,6 @@ impl Worktree {
                 })
             })
         });
-        rpc.state
-            .write()
-            .await
-            .shared_worktrees
-            .insert(open_response.worktree_id, worktree.downgrade());
 
         Ok(worktree)
     }
@@ -989,18 +968,11 @@ impl LocalWorktree {
     ) -> Task<anyhow::Result<(u64, String)>> {
         let snapshot = self.snapshot();
         let share_request = self.share_request(cx);
-        let handle = cx.handle();
         cx.spawn(|this, mut cx| async move {
             let share_request = share_request.await;
             let share_response = rpc.request(share_request).await?;
             let remote_id = share_response.worktree_id;
 
-            rpc.state
-                .write()
-                .await
-                .shared_worktrees
-                .insert(share_response.worktree_id, handle.downgrade());
-
             log::info!("sharing worktree {:?}", share_response);
             let (snapshots_to_send_tx, snapshots_to_send_rx) =
                 smol::channel::unbounded::<Snapshot>();
@@ -2531,172 +2503,6 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
     }
 }
 
-mod remote {
-    use super::*;
-
-    pub async fn add_peer(
-        envelope: TypedEnvelope<proto::AddPeer>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        rpc.state
-            .read()
-            .await
-            .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |worktree, cx| {
-                worktree.handle_add_peer(&envelope, rpc.clone(), cx)
-            })
-    }
-
-    pub async fn remove_peer(
-        envelope: TypedEnvelope<proto::RemovePeer>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        rpc.state
-            .read()
-            .await
-            .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |worktree, cx| {
-                worktree.handle_remove_peer(&envelope, rpc.clone(), cx)
-            })
-    }
-
-    pub async fn update_worktree(
-        envelope: TypedEnvelope<proto::UpdateWorktree>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        rpc.state
-            .read()
-            .await
-            .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |worktree, _| {
-                if let Some(worktree) = worktree.as_remote_mut() {
-                    let mut tx = worktree.updates_tx.clone();
-                    Ok(async move {
-                        tx.send(envelope.payload)
-                            .await
-                            .expect("receiver runs to completion");
-                    })
-                } else {
-                    Err(anyhow!(
-                        "invalid update message for local worktree {}",
-                        envelope.payload.worktree_id
-                    ))
-                }
-            })?
-            .await;
-
-        Ok(())
-    }
-
-    pub async fn open_buffer(
-        envelope: TypedEnvelope<proto::OpenBuffer>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        let receipt = envelope.receipt();
-        let worktree = rpc
-            .state
-            .read()
-            .await
-            .shared_worktree(envelope.payload.worktree_id, cx)?;
-
-        let response = worktree
-            .update(cx, |worktree, cx| {
-                worktree
-                    .as_local_mut()
-                    .unwrap()
-                    .open_remote_buffer(&envelope, cx)
-            })
-            .await?;
-
-        rpc.respond(receipt, response).await?;
-
-        Ok(())
-    }
-
-    pub async fn close_buffer(
-        envelope: TypedEnvelope<proto::CloseBuffer>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        let worktree = rpc
-            .state
-            .read()
-            .await
-            .shared_worktree(envelope.payload.worktree_id, cx)?;
-
-        worktree.update(cx, |worktree, cx| {
-            worktree
-                .as_local_mut()
-                .unwrap()
-                .close_remote_buffer(&envelope, cx)
-        })
-    }
-
-    pub async fn update_buffer(
-        envelope: TypedEnvelope<proto::UpdateBuffer>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        rpc.state
-            .read()
-            .await
-            .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |tree, cx| {
-                tree.handle_update_buffer(&envelope, rpc.clone(), cx)
-            })?;
-        Ok(())
-    }
-
-    pub async fn save_buffer(
-        envelope: TypedEnvelope<proto::SaveBuffer>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        let state = rpc.state.read().await;
-        let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
-        let sender_id = envelope.original_sender_id()?;
-        let buffer = worktree.read_with(cx, |tree, _| {
-            tree.as_local()
-                .unwrap()
-                .shared_buffers
-                .get(&sender_id)
-                .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned())
-                .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))
-        })?;
-        let (version, mtime) = buffer.update(cx, |buffer, cx| buffer.save(cx))?.await?;
-        rpc.respond(
-            envelope.receipt(),
-            proto::BufferSaved {
-                worktree_id: envelope.payload.worktree_id,
-                buffer_id: envelope.payload.buffer_id,
-                version: (&version).into(),
-                mtime: Some(mtime.into()),
-            },
-        )
-        .await?;
-        Ok(())
-    }
-
-    pub async fn buffer_saved(
-        envelope: TypedEnvelope<proto::BufferSaved>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        rpc.state
-            .read()
-            .await
-            .shared_worktree(envelope.payload.worktree_id, cx)?
-            .update(cx, |worktree, cx| {
-                worktree.handle_buffer_saved(&envelope, rpc.clone(), cx)
-            })?;
-        Ok(())
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;