ssh remoting: Ensure only single instance can upload binary (#19597)

Thorsten Ball created

This creates a `<binary_path>.lock` file and checks for it whenever
uploading a binary.

Parameters:
- Wait for other instance to finish: 10m
- Mark lockfile as stale and ignore it after: 10m
- When waiting on another process, check every 5seconds

We can tweak all of them.

Ideally we'd have a value in the lockfile, that lets us detect whether
the other process is still there, but I haven't found something stable
yet:

- We don't have a stable PID on the server side when we run multiple
commands
- The `ControlPath` is on the client side

Release Notes:

- N/A

Change summary

crates/remote/src/ssh_session.rs | 112 +++++++++++++++++++++++++++++++++
1 file changed, 111 insertions(+), 1 deletion(-)

Detailed changes

crates/remote/src/ssh_session.rs 🔗

@@ -40,7 +40,7 @@ use std::{
         atomic::{AtomicU32, Ordering::SeqCst},
         Arc, Weak,
     },
-    time::{Duration, Instant},
+    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
 };
 use tempfile::TempDir;
 use util::ResultExt;
@@ -1359,6 +1359,116 @@ impl SshRemoteConnection {
         dst_path: &Path,
         platform: SshPlatform,
         cx: &mut AsyncAppContext,
+    ) -> Result<()> {
+        let lock_file = dst_path.with_extension("lock");
+        let timestamp = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap()
+            .as_secs();
+        let lock_content = timestamp.to_string();
+
+        let lock_stale_age = Duration::from_secs(10 * 60);
+        let max_wait_time = Duration::from_secs(10 * 60);
+        let check_interval = Duration::from_secs(5);
+        let start_time = Instant::now();
+
+        loop {
+            let lock_acquired = self.create_lock_file(&lock_file, &lock_content).await?;
+            if lock_acquired {
+                let result = self
+                    .update_server_binary_if_needed(delegate, dst_path, platform, cx)
+                    .await;
+
+                self.remove_lock_file(&lock_file).await.ok();
+
+                return result;
+            } else {
+                if let Ok(is_stale) = self.is_lock_stale(&lock_file, &lock_stale_age).await {
+                    if is_stale {
+                        self.remove_lock_file(&lock_file).await?;
+                        continue;
+                    } else {
+                        if start_time.elapsed() > max_wait_time {
+                            return Err(anyhow!("Timeout waiting for lock to be released"));
+                        }
+                        log::info!(
+                            "Found lockfile: {:?}. Will check again in {:?}",
+                            lock_file,
+                            check_interval
+                        );
+                        delegate.set_status(
+                            Some("Waiting for another Zed instance to finish uploading binary..."),
+                            cx,
+                        );
+                        smol::Timer::after(check_interval).await;
+                        continue;
+                    }
+                } else {
+                    // Unable to check lock, assume it's valid and wait
+                    if start_time.elapsed() > max_wait_time {
+                        return Err(anyhow!("Timeout waiting for lock to be released"));
+                    }
+                    smol::Timer::after(check_interval).await;
+                    continue;
+                }
+            }
+        }
+    }
+
+    async fn create_lock_file(&self, lock_file: &Path, content: &str) -> Result<bool> {
+        let parent_dir = lock_file
+            .parent()
+            .ok_or_else(|| anyhow!("Lock file path has no parent directory"))?;
+
+        // Be mindful of the escaping here: we need to make sure that we have quotes
+        // inside the string, so that `sh -c` gets a quoted string passed to it.
+        let script = format!(
+            "\"mkdir -p '{0}' &&  [ ! -f '{1}' ] && echo '{2}' > '{1}' && echo 'created' || echo 'exists'\"",
+            parent_dir.display(),
+            lock_file.display(),
+            content
+        );
+
+        let output = run_cmd(self.socket.ssh_command("sh").arg("-c").arg(&script))
+            .await
+            .with_context(|| format!("failed to create a lock file at {:?}", lock_file))?;
+
+        Ok(output.trim() == "created")
+    }
+
+    async fn is_lock_stale(&self, lock_file: &Path, max_age: &Duration) -> Result<bool> {
+        let threshold = max_age.as_secs();
+
+        // Be mindful of the escaping here: we need to make sure that we have quotes
+        // inside the string, so that `sh -c` gets a quoted string passed to it.
+        let script = format!(
+            "\"[ -f '{0}' ] && [ $(( $(date +%s) - $(date -r '{0}' +%s) )) -gt {1} ] && echo 'stale' ||  echo 'recent'\"",
+            lock_file.display(),
+            threshold
+        );
+
+        let output = run_cmd(self.socket.ssh_command("sh").arg("-c").arg(script))
+            .await
+            .with_context(|| {
+                format!("failed to check whether lock file {:?} is stale", lock_file)
+            })?;
+
+        Ok(output.trim() == "stale")
+    }
+
+    async fn remove_lock_file(&self, lock_file: &Path) -> Result<()> {
+        run_cmd(self.socket.ssh_command("rm").arg("-f").arg(lock_file))
+            .await
+            .context("failed to remove lock file")?;
+        Ok(())
+    }
+
+    async fn update_server_binary_if_needed(
+        &self,
+        delegate: &Arc<dyn SshClientDelegate>,
+        dst_path: &Path,
+        platform: SshPlatform,
+        cx: &mut AsyncAppContext,
     ) -> Result<()> {
         if std::env::var("ZED_USE_CACHED_REMOTE_SERVER").is_ok() {
             if let Ok(installed_version) =