Fix crash in collab when sending worktree updates (#19678)

Antonio Scandurra , Thorsten , Bennet , and Kirill created

This pull request does a couple of things:

- In 29c2df73e1448bb510aebfbc954deb067fc88032, we introduced a safety
guard that prevents this crash from happening again in the future by
returning an error instead of panicking when the payload is too large.
- In 3e7a2e5c3067c53bb12cd0c76c6e7b09af7c8fcf, we introduced chunking
for updates coming from SSH servers (previously, we were sending the
whole changeset and initial set of paths in their entirety).
- In 122b5b4, we introduced a panic hook that sends panics to Axiom.

For posterity, this is how we figured out what the panic was:

```
kubectl logs current-pod-name --previous --namespace=production
```

Release Notes:

- N/A

---------

Co-authored-by: Thorsten <thorsten@zed.dev>
Co-authored-by: Bennet <bennet@zed.dev>
Co-authored-by: Kirill <kirill@zed.dev>

Change summary

crates/collab/src/db/queries/projects.rs | 10 ++++++++++
crates/collab/src/main.rs                | 19 +++++++++++++++++++
crates/collab/src/rpc.rs                 | 14 ++------------
crates/proto/src/proto.rs                | 20 ++++++++++++++------
crates/worktree/src/worktree.rs          | 22 ++++++++++++----------
5 files changed, 57 insertions(+), 28 deletions(-)

Detailed changes

crates/collab/src/db/queries/projects.rs 🔗

@@ -272,6 +272,16 @@ impl Database {
         update: &proto::UpdateWorktree,
         connection: ConnectionId,
     ) -> Result<TransactionGuard<Vec<ConnectionId>>> {
+        if update.removed_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
+            || update.updated_entries.len() > proto::MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE
+        {
+            return Err(anyhow!(
+                "invalid worktree update. removed entries: {}, updated entries: {}",
+                update.removed_entries.len(),
+                update.updated_entries.len()
+            ))?;
+        }
+
         let project_id = ProjectId::from_proto(update.project_id);
         let worktree_id = update.worktree_id as i64;
         self.project_transaction(project_id, |tx| async move {

crates/collab/src/main.rs 🔗

@@ -84,6 +84,8 @@ async fn main() -> Result<()> {
 
             let config = envy::from_env::<Config>().expect("error loading config");
             init_tracing(&config);
+            init_panic_hook();
+
             let mut app = Router::new()
                 .route("/", get(handle_root))
                 .route("/healthz", get(handle_liveness_probe))
@@ -378,3 +380,20 @@ pub fn init_tracing(config: &Config) -> Option<()> {
 
     None
 }
+
+fn init_panic_hook() {
+    std::panic::set_hook(Box::new(move |panic_info| {
+        let panic_message = match panic_info.payload().downcast_ref::<&'static str>() {
+            Some(message) => *message,
+            None => match panic_info.payload().downcast_ref::<String>() {
+                Some(message) => message.as_str(),
+                None => "Box<Any>",
+            },
+        };
+        let backtrace = std::backtrace::Backtrace::force_capture();
+        let location = panic_info
+            .location()
+            .map(|loc| format!("{}:{}", loc.file(), loc.line()));
+        tracing::error!(panic = true, ?location, %panic_message, %backtrace, "Server Panic");
+    }));
+}

crates/collab/src/rpc.rs 🔗

@@ -1713,11 +1713,6 @@ fn notify_rejoined_projects(
 
     for project in rejoined_projects {
         for worktree in mem::take(&mut project.worktrees) {
-            #[cfg(any(test, feature = "test-support"))]
-            const MAX_CHUNK_SIZE: usize = 2;
-            #[cfg(not(any(test, feature = "test-support")))]
-            const MAX_CHUNK_SIZE: usize = 256;
-
             // Stream this worktree's entries.
             let message = proto::UpdateWorktree {
                 project_id: project.id.to_proto(),
@@ -1731,7 +1726,7 @@ fn notify_rejoined_projects(
                 updated_repositories: worktree.updated_repositories,
                 removed_repositories: worktree.removed_repositories,
             };
-            for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
+            for update in proto::split_worktree_update(message) {
                 session.peer.send(session.connection_id, update.clone())?;
             }
 
@@ -2195,11 +2190,6 @@ fn join_project_internal(
     })?;
 
     for (worktree_id, worktree) in mem::take(&mut project.worktrees) {
-        #[cfg(any(test, feature = "test-support"))]
-        const MAX_CHUNK_SIZE: usize = 2;
-        #[cfg(not(any(test, feature = "test-support")))]
-        const MAX_CHUNK_SIZE: usize = 256;
-
         // Stream this worktree's entries.
         let message = proto::UpdateWorktree {
             project_id: project_id.to_proto(),
@@ -2213,7 +2203,7 @@ fn join_project_internal(
             updated_repositories: worktree.repository_entries.into_values().collect(),
             removed_repositories: Default::default(),
         };
-        for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
+        for update in proto::split_worktree_update(message) {
             session.peer.send(session.connection_id, update.clone())?;
         }
 

crates/proto/src/proto.rs 🔗

@@ -630,10 +630,12 @@ impl From<Nonce> for u128 {
     }
 }
 
-pub fn split_worktree_update(
-    mut message: UpdateWorktree,
-    max_chunk_size: usize,
-) -> impl Iterator<Item = UpdateWorktree> {
+#[cfg(any(test, feature = "test-support"))]
+pub const MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE: usize = 2;
+#[cfg(not(any(test, feature = "test-support")))]
+pub const MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE: usize = 256;
+
+pub fn split_worktree_update(mut message: UpdateWorktree) -> impl Iterator<Item = UpdateWorktree> {
     let mut done_files = false;
 
     let mut repository_map = message
@@ -647,13 +649,19 @@ pub fn split_worktree_update(
             return None;
         }
 
-        let updated_entries_chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
+        let updated_entries_chunk_size = cmp::min(
+            message.updated_entries.len(),
+            MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE,
+        );
         let updated_entries: Vec<_> = message
             .updated_entries
             .drain(..updated_entries_chunk_size)
             .collect();
 
-        let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
+        let removed_entries_chunk_size = cmp::min(
+            message.removed_entries.len(),
+            MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE,
+        );
         let removed_entries = message
             .removed_entries
             .drain(..removed_entries_chunk_size)

crates/worktree/src/worktree.rs 🔗

@@ -36,7 +36,10 @@ use postage::{
     prelude::{Sink as _, Stream as _},
     watch,
 };
-use rpc::{proto, AnyProtoClient};
+use rpc::{
+    proto::{self, split_worktree_update},
+    AnyProtoClient,
+};
 pub use settings::WorktreeId;
 use settings::{Settings, SettingsLocation, SettingsStore};
 use smallvec::{smallvec, SmallVec};
@@ -1721,11 +1724,6 @@ impl LocalWorktree {
         F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut,
         Fut: Send + Future<Output = bool>,
     {
-        #[cfg(any(test, feature = "test-support"))]
-        const MAX_CHUNK_SIZE: usize = 2;
-        #[cfg(not(any(test, feature = "test-support")))]
-        const MAX_CHUNK_SIZE: usize = 256;
-
         if let Some(observer) = self.update_observer.as_mut() {
             *observer.resume_updates.borrow_mut() = ();
             return;
@@ -1751,7 +1749,7 @@ impl LocalWorktree {
                         snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes);
                 }
 
-                for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
+                for update in proto::split_worktree_update(update) {
                     let _ = resume_updates_rx.try_recv();
                     loop {
                         let result = callback(update.clone());
@@ -1817,13 +1815,17 @@ impl RemoteWorktree {
         self.update_observer = Some(tx);
         cx.spawn(|this, mut cx| async move {
             let mut update = initial_update;
-            loop {
+            'outer: loop {
                 // SSH projects use a special project ID of 0, and we need to
                 // remap it to the correct one here.
                 update.project_id = project_id;
-                if !callback(update).await {
-                    break;
+
+                for chunk in split_worktree_update(update) {
+                    if !callback(chunk).await {
+                        break 'outer;
+                    }
                 }
+
                 if let Some(next_update) = rx.next().await {
                     update = next_update;
                 } else {