Fix collab (#8298)

Conrad Irwin and Marshall created

Co-Authored-By: Marshall <marshall@zed.dev>

We broke it by deploying two servers simultaneously.

Release Notes:

- N/A

Co-authored-by: Marshall <marshall@zed.dev>

Change summary

crates/collab/k8s/collab.template.yml |  2 
crates/collab/src/api.rs              |  8 +++-
crates/collab/src/main.rs             | 49 ++++++++++++++++++----------
3 files changed, 39 insertions(+), 20 deletions(-)

Detailed changes

crates/collab/k8s/collab.template.yml 🔗

@@ -58,7 +58,7 @@ spec:
         - name: ${ZED_SERVICE_NAME}
           image: "${ZED_IMAGE_ID}"
           args:
-            - serve
+            - serve ${ZED_SERVICE_NAME}
           ports:
             - containerPort: 8080
               protocol: TCP

crates/collab/src/api.rs 🔗

@@ -25,7 +25,7 @@ use tracing::instrument;
 
 pub use extensions::fetch_extensions_from_blob_store_periodically;
 
-pub fn routes(rpc_server: Arc<rpc::Server>, state: Arc<AppState>) -> Router<Body> {
+pub fn routes(rpc_server: Option<Arc<rpc::Server>>, state: Arc<AppState>) -> Router<Body> {
     Router::new()
         .route("/user", get(get_authenticated_user))
         .route("/users/:id/access_tokens", post(create_access_token))
@@ -136,8 +136,12 @@ async fn trace_panic(panic: Json<Panic>) -> Result<()> {
 }
 
 async fn get_rpc_server_snapshot(
-    Extension(rpc_server): Extension<Arc<rpc::Server>>,
+    Extension(rpc_server): Extension<Option<Arc<rpc::Server>>>,
 ) -> Result<ErasedJson> {
+    let Some(rpc_server) = rpc_server else {
+        return Err(Error::Internal(anyhow!("rpc server is not available")));
+    };
+
     Ok(ErasedJson::pretty(rpc_server.snapshot().await))
 }
 

crates/collab/src/main.rs 🔗

@@ -28,7 +28,8 @@ async fn main() -> Result<()> {
         );
     }
 
-    match args().skip(1).next().as_deref() {
+    let mut args = args().skip(1);
+    match args.next().as_deref() {
         Some("version") => {
             println!("collab v{} ({})", VERSION, REVISION.unwrap_or("unknown"));
         }
@@ -36,6 +37,8 @@ async fn main() -> Result<()> {
             run_migrations().await?;
         }
         Some("serve") => {
+            let is_api_only = args.next().is_some_and(|arg| arg == "api");
+
             let config = envy::from_env::<Config>().expect("error loading config");
             init_tracing(&config);
 
@@ -46,24 +49,33 @@ async fn main() -> Result<()> {
             let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port))
                 .expect("failed to bind TCP listener");
 
-            let epoch = state
-                .db
-                .create_server(&state.config.zed_environment)
-                .await?;
-            let rpc_server = collab::rpc::Server::new(epoch, state.clone(), Executor::Production);
-            rpc_server.start().await?;
+            let rpc_server = if !is_api_only {
+                let epoch = state
+                    .db
+                    .create_server(&state.config.zed_environment)
+                    .await?;
+                let rpc_server =
+                    collab::rpc::Server::new(epoch, state.clone(), Executor::Production);
+                rpc_server.start().await?;
+
+                Some(rpc_server)
+            } else {
+                None
+            };
 
             fetch_extensions_from_blob_store_periodically(state.clone(), Executor::Production);
 
-            let app = collab::api::routes(rpc_server.clone(), state.clone())
-                .merge(collab::rpc::routes(rpc_server.clone()))
-                .merge(
-                    Router::new()
-                        .route("/", get(handle_root))
-                        .route("/healthz", get(handle_liveness_probe))
-                        .merge(collab::api::events::router())
-                        .layer(Extension(state.clone())),
-                );
+            let mut app = collab::api::routes(rpc_server.clone(), state.clone());
+            if let Some(rpc_server) = rpc_server.clone() {
+                app = app.merge(collab::rpc::routes(rpc_server))
+            }
+            app = app.merge(
+                Router::new()
+                    .route("/", get(handle_root))
+                    .route("/healthz", get(handle_liveness_probe))
+                    .merge(collab::api::events::router())
+                    .layer(Extension(state.clone())),
+            );
 
             axum::Server::from_tcp(listener)?
                 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
@@ -77,7 +89,10 @@ async fn main() -> Result<()> {
                     futures::pin_mut!(sigterm, sigint);
                     futures::future::select(sigterm, sigint).await;
                     tracing::info!("Received interrupt signal");
-                    rpc_server.teardown();
+
+                    if let Some(rpc_server) = rpc_server {
+                        rpc_server.teardown();
+                    }
                 })
                 .await?;
         }