ssh-remoting: Fix go to definition out of worktree (#18094)

Conrad Irwin and Mikayla created

Release Notes:

- ssh-remoting: Fixed go to definition outside of worktree

---------

Co-authored-by: Mikayla <mikayla@zed.dev>

Change summary

crates/client/src/client.rs                  |   4 
crates/project/src/lsp_store.rs              |  21 +-
crates/project/src/project.rs                | 181 +++++++++------------
crates/project/src/worktree_store.rs         | 173 ++++++++++++++++----
crates/remote/src/ssh_session.rs             |   7 
crates/remote_server/src/headless_project.rs |  17 -
crates/rpc/src/proto_client.rs               |   6 
7 files changed, 251 insertions(+), 158 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -1621,6 +1621,10 @@ impl ProtoClient for Client {
     fn message_handler_set(&self) -> &parking_lot::Mutex<ProtoMessageHandlerSet> {
         &self.handler_set
     }
+
+    fn goes_via_collab(&self) -> bool {
+        true
+    }
 }
 
 #[derive(Serialize, Deserialize)]

crates/project/src/lsp_store.rs 🔗

@@ -534,6 +534,9 @@ impl LspStore {
             }
             WorktreeStoreEvent::WorktreeRemoved(_, id) => self.remove_worktree(*id, cx),
             WorktreeStoreEvent::WorktreeOrderChanged => {}
+            WorktreeStoreEvent::WorktreeUpdateSent(worktree) => {
+                worktree.update(cx, |worktree, _cx| self.send_diagnostic_summaries(worktree));
+            }
         }
     }
 
@@ -764,24 +767,22 @@ impl LspStore {
         self.active_entry = active_entry;
     }
 
-    pub(crate) fn send_diagnostic_summaries(
-        &self,
-        worktree: &mut Worktree,
-    ) -> Result<(), anyhow::Error> {
+    pub(crate) fn send_diagnostic_summaries(&self, worktree: &mut Worktree) {
         if let Some(client) = self.downstream_client.clone() {
             if let Some(summaries) = self.diagnostic_summaries.get(&worktree.id()) {
                 for (path, summaries) in summaries {
                     for (&server_id, summary) in summaries {
-                        client.send(proto::UpdateDiagnosticSummary {
-                            project_id: self.project_id,
-                            worktree_id: worktree.id().to_proto(),
-                            summary: Some(summary.to_proto(server_id, path)),
-                        })?;
+                        client
+                            .send(proto::UpdateDiagnosticSummary {
+                                project_id: self.project_id,
+                                worktree_id: worktree.id().to_proto(),
+                                summary: Some(summary.to_proto(server_id, path)),
+                            })
+                            .log_err();
                     }
                 }
             }
         }
-        Ok(())
     }
 
     pub fn request_lsp<R: LspCommand>(

crates/project/src/project.rs 🔗

@@ -31,7 +31,7 @@ pub use environment::ProjectEnvironment;
 use futures::{
     channel::mpsc::{self, UnboundedReceiver},
     future::try_join_all,
-    AsyncWriteExt, FutureExt, StreamExt,
+    AsyncWriteExt, StreamExt,
 };
 
 use git::{blame::Blame, repository::GitRepository};
@@ -152,7 +152,7 @@ pub struct Project {
     _subscriptions: Vec<gpui::Subscription>,
     buffers_needing_diff: HashSet<WeakModel<Buffer>>,
     git_diff_debouncer: DebouncedDelay<Self>,
-    remotely_created_buffers: Arc<Mutex<RemotelyCreatedBuffers>>,
+    remotely_created_models: Arc<Mutex<RemotelyCreatedModels>>,
     terminals: Terminals,
     node: Option<Arc<dyn NodeRuntime>>,
     tasks: Model<Inventory>,
@@ -169,26 +169,28 @@ pub struct Project {
 }
 
 #[derive(Default)]
-struct RemotelyCreatedBuffers {
+struct RemotelyCreatedModels {
+    worktrees: Vec<Model<Worktree>>,
     buffers: Vec<Model<Buffer>>,
     retain_count: usize,
 }
 
-struct RemotelyCreatedBufferGuard {
-    remote_buffers: std::sync::Weak<Mutex<RemotelyCreatedBuffers>>,
+struct RemotelyCreatedModelGuard {
+    remote_models: std::sync::Weak<Mutex<RemotelyCreatedModels>>,
 }
 
-impl Drop for RemotelyCreatedBufferGuard {
+impl Drop for RemotelyCreatedModelGuard {
     fn drop(&mut self) {
-        if let Some(remote_buffers) = self.remote_buffers.upgrade() {
-            let mut remote_buffers = remote_buffers.lock();
+        if let Some(remote_models) = self.remote_models.upgrade() {
+            let mut remote_models = remote_models.lock();
             assert!(
-                remote_buffers.retain_count > 0,
-                "RemotelyCreatedBufferGuard dropped too many times"
+                remote_models.retain_count > 0,
+                "RemotelyCreatedModelGuard dropped too many times"
             );
-            remote_buffers.retain_count -= 1;
-            if remote_buffers.retain_count == 0 {
-                remote_buffers.buffers.clear();
+            remote_models.retain_count -= 1;
+            if remote_models.retain_count == 0 {
+                remote_models.buffers.clear();
+                remote_models.worktrees.clear();
             }
         }
     }
@@ -620,7 +622,7 @@ impl Project {
             let snippets =
                 SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx);
 
-            let worktree_store = cx.new_model(|_| WorktreeStore::new(false, fs.clone()));
+            let worktree_store = cx.new_model(|_| WorktreeStore::new(None, false, fs.clone()));
             cx.subscribe(&worktree_store, Self::on_worktree_store_event)
                 .detach();
 
@@ -687,7 +689,7 @@ impl Project {
                 dev_server_project_id: None,
                 search_history: Self::new_search_history(),
                 environment,
-                remotely_created_buffers: Default::default(),
+                remotely_created_models: Default::default(),
                 last_formatting_failure: None,
                 buffers_being_formatted: Default::default(),
                 search_included_history: Self::new_search_history(),
@@ -714,11 +716,8 @@ impl Project {
             let snippets =
                 SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx);
 
-            let worktree_store = cx.new_model(|_| {
-                let mut worktree_store = WorktreeStore::new(false, fs.clone());
-                worktree_store.set_upstream_client(ssh.clone().into());
-                worktree_store
-            });
+            let worktree_store =
+                cx.new_model(|_| WorktreeStore::new(Some(ssh.clone().into()), false, fs.clone()));
             cx.subscribe(&worktree_store, Self::on_worktree_store_event)
                 .detach();
 
@@ -773,7 +772,7 @@ impl Project {
                 dev_server_project_id: None,
                 search_history: Self::new_search_history(),
                 environment,
-                remotely_created_buffers: Default::default(),
+                remotely_created_models: Default::default(),
                 last_formatting_failure: None,
                 buffers_being_formatted: Default::default(),
                 search_included_history: Self::new_search_history(),
@@ -787,8 +786,9 @@ impl Project {
             ssh.subscribe_to_entity(SSH_PROJECT_ID, &this.worktree_store);
             ssh.subscribe_to_entity(SSH_PROJECT_ID, &this.lsp_store);
             ssh.subscribe_to_entity(SSH_PROJECT_ID, &this.settings_observer);
-            client.add_model_message_handler(Self::handle_update_worktree);
             client.add_model_message_handler(Self::handle_create_buffer_for_peer);
+            client.add_model_message_handler(Self::handle_update_worktree);
+            client.add_model_message_handler(Self::handle_update_project);
             client.add_model_request_handler(BufferStore::handle_update_buffer);
             BufferStore::init(&client);
             LspStore::init(&client);
@@ -867,8 +867,7 @@ impl Project {
         let role = response.payload.role();
 
         let worktree_store = cx.new_model(|_| {
-            let mut store = WorktreeStore::new(true, fs.clone());
-            store.set_upstream_client(client.clone().into());
+            let mut store = WorktreeStore::new(Some(client.clone().into()), true, fs.clone());
             if let Some(dev_server_project_id) = response.payload.dev_server_project_id {
                 store.set_dev_server_project_id(DevServerProjectId(dev_server_project_id));
             }
@@ -955,7 +954,7 @@ impl Project {
                 search_included_history: Self::new_search_history(),
                 search_excluded_history: Self::new_search_history(),
                 environment: ProjectEnvironment::new(&worktree_store, None, cx),
-                remotely_created_buffers: Arc::new(Mutex::new(RemotelyCreatedBuffers::default())),
+                remotely_created_models: Arc::new(Mutex::new(RemotelyCreatedModels::default())),
                 last_formatting_failure: None,
                 buffers_being_formatted: Default::default(),
             };
@@ -1259,43 +1258,6 @@ impl Project {
         }
     }
 
-    fn metadata_changed(&mut self, cx: &mut ModelContext<Self>) {
-        cx.notify();
-
-        let ProjectClientState::Shared { remote_id } = self.client_state else {
-            return;
-        };
-        let project_id = remote_id;
-
-        let update_project = self.client.request(proto::UpdateProject {
-            project_id,
-            worktrees: self.worktree_metadata_protos(cx),
-        });
-        cx.spawn(|this, mut cx| async move {
-            update_project.await?;
-            this.update(&mut cx, |this, cx| {
-                let client = this.client.clone();
-                let worktrees = this.worktree_store.read(cx).worktrees().collect::<Vec<_>>();
-
-                for worktree in worktrees {
-                    worktree.update(cx, |worktree, cx| {
-                        let client = client.clone();
-                        worktree.observe_updates(project_id, cx, {
-                            move |update| client.request(update).map(|result| result.is_ok())
-                        });
-
-                        this.lsp_store.update(cx, |lsp_store, _| {
-                            lsp_store.send_diagnostic_summaries(worktree)
-                        })
-                    })?;
-                }
-
-                anyhow::Ok(())
-            })
-        })
-        .detach_and_log_err(cx);
-    }
-
     pub fn task_inventory(&self) -> &Model<Inventory> {
         &self.tasks
     }
@@ -1513,7 +1475,7 @@ impl Project {
             buffer_store.shared(project_id, self.client.clone().into(), cx)
         });
         self.worktree_store.update(cx, |worktree_store, cx| {
-            worktree_store.set_shared(true, cx);
+            worktree_store.shared(project_id, self.client.clone().into(), cx);
         });
         self.lsp_store.update(cx, |lsp_store, cx| {
             lsp_store.shared(project_id, self.client.clone().into(), cx)
@@ -1526,7 +1488,6 @@ impl Project {
             remote_id: project_id,
         };
 
-        self.metadata_changed(cx);
         cx.emit(Event::RemoteIdChanged(Some(project_id)));
         cx.notify();
         Ok(())
@@ -1540,7 +1501,11 @@ impl Project {
         self.buffer_store
             .update(cx, |buffer_store, _| buffer_store.forget_shared_buffers());
         self.set_collaborators_from_proto(message.collaborators, cx)?;
-        self.metadata_changed(cx);
+
+        self.worktree_store.update(cx, |worktree_store, cx| {
+            worktree_store.send_project_updates(cx);
+        });
+        cx.notify();
         cx.emit(Event::Reshared);
         Ok(())
     }
@@ -1576,7 +1541,6 @@ impl Project {
 
     pub fn unshare(&mut self, cx: &mut ModelContext<Self>) -> Result<()> {
         self.unshare_internal(cx)?;
-        self.metadata_changed(cx);
         cx.notify();
         Ok(())
     }
@@ -1598,7 +1562,7 @@ impl Project {
             self.collaborators.clear();
             self.client_subscriptions.clear();
             self.worktree_store.update(cx, |store, cx| {
-                store.set_shared(false, cx);
+                store.unshared(cx);
             });
             self.buffer_store.update(cx, |buffer_store, cx| {
                 buffer_store.forget_shared_buffers();
@@ -1867,9 +1831,9 @@ impl Project {
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         {
-            let mut remotely_created_buffers = self.remotely_created_buffers.lock();
-            if remotely_created_buffers.retain_count > 0 {
-                remotely_created_buffers.buffers.push(buffer.clone())
+            let mut remotely_created_models = self.remotely_created_models.lock();
+            if remotely_created_models.retain_count > 0 {
+                remotely_created_models.buffers.push(buffer.clone())
             }
         }
 
@@ -2110,10 +2074,17 @@ impl Project {
                 cx.emit(Event::WorktreeRemoved(*id));
             }
             WorktreeStoreEvent::WorktreeOrderChanged => cx.emit(Event::WorktreeOrderChanged),
+            WorktreeStoreEvent::WorktreeUpdateSent(_) => {}
         }
     }
 
     fn on_worktree_added(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
+        {
+            let mut remotely_created_models = self.remotely_created_models.lock();
+            if remotely_created_models.retain_count > 0 {
+                remotely_created_models.worktrees.push(worktree.clone())
+            }
+        }
         cx.observe(worktree, |_, _, cx| cx.notify()).detach();
         cx.subscribe(worktree, |this, worktree, event, cx| {
             let is_local = worktree.read(cx).is_local();
@@ -2140,7 +2111,7 @@ impl Project {
             }
         })
         .detach();
-        self.metadata_changed(cx);
+        cx.notify();
     }
 
     fn on_worktree_removed(&mut self, id_to_remove: WorktreeId, cx: &mut ModelContext<Self>) {
@@ -2171,7 +2142,7 @@ impl Project {
             inventory.remove_worktree_sources(id_to_remove);
         });
 
-        self.metadata_changed(cx);
+        cx.notify();
     }
 
     fn on_buffer_event(
@@ -3012,7 +2983,7 @@ impl Project {
 
     #[inline(never)]
     fn definition_impl(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: PointUtf16,
         cx: &mut ModelContext<Self>,
@@ -3025,7 +2996,7 @@ impl Project {
         )
     }
     pub fn definition<T: ToPointUtf16>(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: T,
         cx: &mut ModelContext<Self>,
@@ -3035,7 +3006,7 @@ impl Project {
     }
 
     fn declaration_impl(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: PointUtf16,
         cx: &mut ModelContext<Self>,
@@ -3049,7 +3020,7 @@ impl Project {
     }
 
     pub fn declaration<T: ToPointUtf16>(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: T,
         cx: &mut ModelContext<Self>,
@@ -3059,7 +3030,7 @@ impl Project {
     }
 
     fn type_definition_impl(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: PointUtf16,
         cx: &mut ModelContext<Self>,
@@ -3073,7 +3044,7 @@ impl Project {
     }
 
     pub fn type_definition<T: ToPointUtf16>(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: T,
         cx: &mut ModelContext<Self>,
@@ -3083,7 +3054,7 @@ impl Project {
     }
 
     pub fn implementation<T: ToPointUtf16>(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: T,
         cx: &mut ModelContext<Self>,
@@ -3098,7 +3069,7 @@ impl Project {
     }
 
     pub fn references<T: ToPointUtf16>(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: T,
         cx: &mut ModelContext<Self>,
@@ -3113,7 +3084,7 @@ impl Project {
     }
 
     fn document_highlights_impl(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: PointUtf16,
         cx: &mut ModelContext<Self>,
@@ -3127,7 +3098,7 @@ impl Project {
     }
 
     pub fn document_highlights<T: ToPointUtf16>(
-        &self,
+        &mut self,
         buffer: &Model<Buffer>,
         position: T,
         cx: &mut ModelContext<Self>,
@@ -3514,7 +3485,7 @@ impl Project {
             query: Some(query.to_proto()),
             limit: limit as _,
         });
-        let guard = self.retain_remotely_created_buffers(cx);
+        let guard = self.retain_remotely_created_models(cx);
 
         cx.spawn(move |this, mut cx| async move {
             let response = request.await?;
@@ -3536,7 +3507,7 @@ impl Project {
     }
 
     pub fn request_lsp<R: LspCommand>(
-        &self,
+        &mut self,
         buffer_handle: Model<Buffer>,
         server: LanguageServerToQuery,
         request: R,
@@ -3546,8 +3517,14 @@ impl Project {
         <R::LspRequest as lsp::request::Request>::Result: Send,
         <R::LspRequest as lsp::request::Request>::Params: Send,
     {
-        self.lsp_store.update(cx, |lsp_store, cx| {
+        let guard = self.retain_remotely_created_models(cx);
+        let task = self.lsp_store.update(cx, |lsp_store, cx| {
             lsp_store.request_lsp(buffer_handle, server, request, cx)
+        });
+        cx.spawn(|_, _| async move {
+            let result = task.await;
+            drop(guard);
+            result
         })
     }
 
@@ -4095,6 +4072,7 @@ impl Project {
         })?
     }
 
+    // Collab sends UpdateWorktree protos as messages
     async fn handle_update_worktree(
         this: Model<Self>,
         envelope: TypedEnvelope<proto::UpdateWorktree>,
@@ -4130,19 +4108,21 @@ impl Project {
         BufferStore::handle_update_buffer(buffer_store, envelope, cx).await
     }
 
-    fn retain_remotely_created_buffers(
+    fn retain_remotely_created_models(
         &mut self,
         cx: &mut ModelContext<Self>,
-    ) -> RemotelyCreatedBufferGuard {
+    ) -> RemotelyCreatedModelGuard {
         {
-            let mut remotely_created_buffers = self.remotely_created_buffers.lock();
-            if remotely_created_buffers.retain_count == 0 {
-                remotely_created_buffers.buffers = self.buffer_store.read(cx).buffers().collect();
+            let mut remotely_create_models = self.remotely_created_models.lock();
+            if remotely_create_models.retain_count == 0 {
+                remotely_create_models.buffers = self.buffer_store.read(cx).buffers().collect();
+                remotely_create_models.worktrees =
+                    self.worktree_store.read(cx).worktrees().collect();
             }
-            remotely_created_buffers.retain_count += 1;
+            remotely_create_models.retain_count += 1;
         }
-        RemotelyCreatedBufferGuard {
-            remote_buffers: Arc::downgrade(&self.remotely_created_buffers),
+        RemotelyCreatedModelGuard {
+            remote_models: Arc::downgrade(&self.remotely_created_models),
         }
     }
 
@@ -4637,16 +4617,11 @@ impl Project {
         worktrees: Vec<proto::WorktreeMetadata>,
         cx: &mut ModelContext<Project>,
     ) -> Result<()> {
-        self.metadata_changed(cx);
-        self.worktree_store.update(cx, |worktree_store, cx| {
-            worktree_store.set_worktrees_from_proto(
-                worktrees,
-                self.replica_id(),
-                self.remote_id().ok_or_else(|| anyhow!("invalid project"))?,
-                self.client.clone().into(),
-                cx,
-            )
-        })
+        cx.notify();
+        let result = self.worktree_store.update(cx, |worktree_store, cx| {
+            worktree_store.set_worktrees_from_proto(worktrees, self.replica_id(), cx)
+        });
+        result
     }
 
     fn set_collaborators_from_proto(

crates/project/src/worktree_store.rs 🔗

@@ -39,8 +39,10 @@ struct MatchingEntry {
 pub struct WorktreeStore {
     next_entry_id: Arc<AtomicUsize>,
     upstream_client: Option<AnyProtoClient>,
+    downstream_client: Option<AnyProtoClient>,
+    remote_id: u64,
     dev_server_project_id: Option<DevServerProjectId>,
-    is_shared: bool,
+    retain_worktrees: bool,
     worktrees: Vec<WorktreeHandle>,
     worktrees_reordered: bool,
     #[allow(clippy::type_complexity)]
@@ -53,6 +55,7 @@ pub enum WorktreeStoreEvent {
     WorktreeAdded(Model<Worktree>),
     WorktreeRemoved(EntityId, WorktreeId),
     WorktreeOrderChanged,
+    WorktreeUpdateSent(Model<Worktree>),
 }
 
 impl EventEmitter<WorktreeStoreEvent> for WorktreeStore {}
@@ -66,23 +69,25 @@ impl WorktreeStore {
         client.add_model_request_handler(Self::handle_expand_project_entry);
     }
 
-    pub fn new(retain_worktrees: bool, fs: Arc<dyn Fs>) -> Self {
+    pub fn new(
+        upstream_client: Option<AnyProtoClient>,
+        retain_worktrees: bool,
+        fs: Arc<dyn Fs>,
+    ) -> Self {
         Self {
             next_entry_id: Default::default(),
             loading_worktrees: Default::default(),
-            upstream_client: None,
             dev_server_project_id: None,
-            is_shared: retain_worktrees,
+            downstream_client: None,
             worktrees: Vec::new(),
             worktrees_reordered: false,
+            retain_worktrees,
+            remote_id: 0,
+            upstream_client,
             fs,
         }
     }
 
-    pub fn set_upstream_client(&mut self, client: AnyProtoClient) {
-        self.upstream_client = Some(client);
-    }
-
     pub fn set_dev_server_project_id(&mut self, id: DevServerProjectId) {
         self.dev_server_project_id = Some(id);
     }
@@ -201,6 +206,13 @@ impl WorktreeStore {
                     path: abs_path.clone(),
                 })
                 .await?;
+
+            if let Some(existing_worktree) = this.read_with(&cx, |this, cx| {
+                this.worktree_for_id(WorktreeId::from_proto(response.worktree_id), cx)
+            })? {
+                return Ok(existing_worktree);
+            }
+
             let worktree = cx.update(|cx| {
                 Worktree::remote(
                     0,
@@ -302,7 +314,10 @@ impl WorktreeStore {
     }
 
     pub fn add(&mut self, worktree: &Model<Worktree>, cx: &mut ModelContext<Self>) {
-        let push_strong_handle = self.is_shared || worktree.read(cx).is_visible();
+        let worktree_id = worktree.read(cx).id();
+        debug_assert!(!self.worktrees().any(|w| w.read(cx).id() == worktree_id));
+
+        let push_strong_handle = self.retain_worktrees || worktree.read(cx).is_visible();
         let handle = if push_strong_handle {
             WorktreeHandle::Strong(worktree.clone())
         } else {
@@ -322,13 +337,15 @@ impl WorktreeStore {
         }
 
         cx.emit(WorktreeStoreEvent::WorktreeAdded(worktree.clone()));
+        self.send_project_updates(cx);
 
         let handle_id = worktree.entity_id();
-        cx.observe_release(worktree, move |_, worktree, cx| {
+        cx.observe_release(worktree, move |this, worktree, cx| {
             cx.emit(WorktreeStoreEvent::WorktreeRemoved(
                 handle_id,
                 worktree.id(),
             ));
+            this.send_project_updates(cx);
         })
         .detach();
     }
@@ -349,6 +366,7 @@ impl WorktreeStore {
                 false
             }
         });
+        self.send_project_updates(cx);
     }
 
     pub fn set_worktrees_reordered(&mut self, worktrees_reordered: bool) {
@@ -359,8 +377,6 @@ impl WorktreeStore {
         &mut self,
         worktrees: Vec<proto::WorktreeMetadata>,
         replica_id: ReplicaId,
-        remote_id: u64,
-        client: AnyProtoClient,
         cx: &mut ModelContext<Self>,
     ) -> Result<()> {
         let mut old_worktrees_by_id = self
@@ -372,18 +388,31 @@ impl WorktreeStore {
             })
             .collect::<HashMap<_, _>>();
 
+        let client = self
+            .upstream_client
+            .clone()
+            .ok_or_else(|| anyhow!("invalid project"))?;
+
         for worktree in worktrees {
             if let Some(old_worktree) =
                 old_worktrees_by_id.remove(&WorktreeId::from_proto(worktree.id))
             {
-                self.worktrees.push(WorktreeHandle::Strong(old_worktree));
+                let push_strong_handle =
+                    self.retain_worktrees || old_worktree.read(cx).is_visible();
+                let handle = if push_strong_handle {
+                    WorktreeHandle::Strong(old_worktree.clone())
+                } else {
+                    WorktreeHandle::Weak(old_worktree.downgrade())
+                };
+                self.worktrees.push(handle);
             } else {
                 self.add(
-                    &Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx),
+                    &Worktree::remote(self.remote_id, replica_id, worktree, client.clone(), cx),
                     cx,
                 );
             }
         }
+        self.send_project_updates(cx);
 
         Ok(())
     }
@@ -446,33 +475,109 @@ impl WorktreeStore {
         }
     }
 
-    pub fn set_shared(&mut self, is_shared: bool, cx: &mut ModelContext<Self>) {
-        self.is_shared = is_shared;
+    pub fn send_project_updates(&mut self, cx: &mut ModelContext<Self>) {
+        let Some(downstream_client) = self.downstream_client.clone() else {
+            return;
+        };
+        let project_id = self.remote_id;
+
+        let update = proto::UpdateProject {
+            project_id,
+            worktrees: self.worktree_metadata_protos(cx),
+        };
+
+        // collab has bad concurrency guarantees, so we send requests in serial.
+        let update_project = if downstream_client.goes_via_collab() {
+            Some(downstream_client.request(update))
+        } else {
+            downstream_client.send(update).log_err();
+            None
+        };
+        cx.spawn(|this, mut cx| async move {
+            if let Some(update_project) = update_project {
+                update_project.await?;
+            }
+
+            this.update(&mut cx, |this, cx| {
+                let worktrees = this.worktrees().collect::<Vec<_>>();
+
+                for worktree in worktrees {
+                    worktree.update(cx, |worktree, cx| {
+                        let client = downstream_client.clone();
+                        worktree.observe_updates(project_id, cx, {
+                            move |update| {
+                                let client = client.clone();
+                                async move {
+                                    if client.goes_via_collab() {
+                                        client.request(update).map(|result| result.is_ok()).await
+                                    } else {
+                                        client.send(update).is_ok()
+                                    }
+                                }
+                            }
+                        });
+                    });
+
+                    cx.emit(WorktreeStoreEvent::WorktreeUpdateSent(worktree.clone()))
+                }
+
+                anyhow::Ok(())
+            })
+        })
+        .detach_and_log_err(cx);
+    }
+
+    pub fn worktree_metadata_protos(&self, cx: &AppContext) -> Vec<proto::WorktreeMetadata> {
+        self.worktrees()
+            .map(|worktree| {
+                let worktree = worktree.read(cx);
+                proto::WorktreeMetadata {
+                    id: worktree.id().to_proto(),
+                    root_name: worktree.root_name().into(),
+                    visible: worktree.is_visible(),
+                    abs_path: worktree.abs_path().to_string_lossy().into(),
+                }
+            })
+            .collect()
+    }
+
+    pub fn shared(
+        &mut self,
+        remote_id: u64,
+        downsteam_client: AnyProtoClient,
+        cx: &mut ModelContext<Self>,
+    ) {
+        self.retain_worktrees = true;
+        self.remote_id = remote_id;
+        self.downstream_client = Some(downsteam_client);
 
         // When shared, retain all worktrees
-        if is_shared {
-            for worktree_handle in self.worktrees.iter_mut() {
-                match worktree_handle {
-                    WorktreeHandle::Strong(_) => {}
-                    WorktreeHandle::Weak(worktree) => {
-                        if let Some(worktree) = worktree.upgrade() {
-                            *worktree_handle = WorktreeHandle::Strong(worktree);
-                        }
+        for worktree_handle in self.worktrees.iter_mut() {
+            match worktree_handle {
+                WorktreeHandle::Strong(_) => {}
+                WorktreeHandle::Weak(worktree) => {
+                    if let Some(worktree) = worktree.upgrade() {
+                        *worktree_handle = WorktreeHandle::Strong(worktree);
                     }
                 }
             }
         }
+        self.send_project_updates(cx);
+    }
+
+    pub fn unshared(&mut self, cx: &mut ModelContext<Self>) {
+        self.retain_worktrees = false;
+        self.downstream_client.take();
+
         // When not shared, only retain the visible worktrees
-        else {
-            for worktree_handle in self.worktrees.iter_mut() {
-                if let WorktreeHandle::Strong(worktree) = worktree_handle {
-                    let is_visible = worktree.update(cx, |worktree, _| {
-                        worktree.stop_observing_updates();
-                        worktree.is_visible()
-                    });
-                    if !is_visible {
-                        *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
-                    }
+        for worktree_handle in self.worktrees.iter_mut() {
+            if let WorktreeHandle::Strong(worktree) = worktree_handle {
+                let is_visible = worktree.update(cx, |worktree, _| {
+                    worktree.stop_observing_updates();
+                    worktree.is_visible()
+                });
+                if !is_visible {
+                    *worktree_handle = WorktreeHandle::Weak(worktree.downgrade());
                 }
             }
         }

crates/remote/src/ssh_session.rs 🔗

@@ -247,7 +247,8 @@ impl SshSession {
                                     let line_ix = start_ix + ix;
                                     let content = &stderr_buffer[start_ix..line_ix];
                                     start_ix = line_ix + 1;
-                                    if let Ok(record) = serde_json::from_slice::<LogRecord>(content) {
+                                    if let Ok(mut record) = serde_json::from_slice::<LogRecord>(content) {
+                                        record.message = format!("(remote) {}", record.message);
                                         record.log(log::logger())
                                     } else {
                                         eprintln!("(remote) {}", String::from_utf8_lossy(content));
@@ -469,6 +470,10 @@ impl ProtoClient for SshSession {
     fn message_handler_set(&self) -> &Mutex<ProtoMessageHandlerSet> {
         &self.state
     }
+
+    fn goes_via_collab(&self) -> bool {
+        false
+    }
 }
 
 impl SshClientState {

crates/remote_server/src/headless_project.rs 🔗

@@ -44,7 +44,11 @@ impl HeadlessProject {
     pub fn new(session: Arc<SshSession>, fs: Arc<dyn Fs>, cx: &mut ModelContext<Self>) -> Self {
         let languages = Arc::new(LanguageRegistry::new(cx.background_executor().clone()));
 
-        let worktree_store = cx.new_model(|_| WorktreeStore::new(true, fs.clone()));
+        let worktree_store = cx.new_model(|cx| {
+            let mut store = WorktreeStore::new(None, true, fs.clone());
+            store.shared(SSH_PROJECT_ID, session.clone().into(), cx);
+            store
+        });
         let buffer_store = cx.new_model(|cx| {
             let mut buffer_store =
                 BufferStore::new(worktree_store.clone(), Some(SSH_PROJECT_ID), cx);
@@ -196,18 +200,11 @@ impl HeadlessProject {
             .await?;
 
         this.update(&mut cx, |this, cx| {
-            let session = this.session.clone();
             this.worktree_store.update(cx, |worktree_store, cx| {
                 worktree_store.add(&worktree, cx);
             });
-            worktree.update(cx, |worktree, cx| {
-                worktree.observe_updates(0, cx, move |update| {
-                    session.send(update).ok();
-                    futures::future::ready(true)
-                });
-                proto::AddWorktreeResponse {
-                    worktree_id: worktree.id().to_proto(),
-                }
+            worktree.update(cx, |worktree, _| proto::AddWorktreeResponse {
+                worktree_id: worktree.id().to_proto(),
             })
         })
     }

crates/rpc/src/proto_client.rs 🔗

@@ -27,6 +27,8 @@ pub trait ProtoClient: Send + Sync {
     fn send_response(&self, envelope: Envelope, message_type: &'static str) -> anyhow::Result<()>;
 
     fn message_handler_set(&self) -> &parking_lot::Mutex<ProtoMessageHandlerSet>;
+
+    fn goes_via_collab(&self) -> bool;
 }
 
 #[derive(Default)]
@@ -139,6 +141,10 @@ impl AnyProtoClient {
         Self(client)
     }
 
+    pub fn goes_via_collab(&self) -> bool {
+        self.0.goes_via_collab()
+    }
+
     pub fn request<T: RequestMessage>(
         &self,
         request: T,