Remove `Executor` trait from `collab` and use an enum instead

Antonio Scandurra created

This will let us save off the executor and avoid using generics.

Change summary

crates/collab/src/executor.rs          | 36 ++++++++++++++++++++++++
crates/collab/src/integration_tests.rs | 23 +++------------
crates/collab/src/lib.rs               |  1 
crates/collab/src/rpc.rs               | 41 ++++-----------------------
4 files changed, 48 insertions(+), 53 deletions(-)

Detailed changes

crates/collab/src/executor.rs 🔗

@@ -0,0 +1,36 @@
+use std::{future::Future, time::Duration};
+
+#[derive(Clone)]
+pub enum Executor {
+    Production,
+    #[cfg(test)]
+    Deterministic(std::sync::Arc<gpui::executor::Background>),
+}
+
+impl Executor {
+    pub fn spawn_detached<F>(&self, future: F)
+    where
+        F: 'static + Send + Future<Output = ()>,
+    {
+        match self {
+            Executor::Production => {
+                tokio::spawn(future);
+            }
+            #[cfg(test)]
+            Executor::Deterministic(background) => {
+                background.spawn(future).detach();
+            }
+        }
+    }
+
+    pub fn sleep(&self, duration: Duration) -> impl Future<Output = ()> {
+        let this = self.clone();
+        async move {
+            match this {
+                Executor::Production => tokio::time::sleep(duration).await,
+                #[cfg(test)]
+                Executor::Deterministic(background) => background.timer(duration).await,
+            }
+        }
+    }
+}

crates/collab/src/integration_tests.rs 🔗

@@ -1,9 +1,9 @@
 use crate::{
     db::{self, NewUserParams, TestDb, UserId},
-    rpc::{Executor, Server},
+    executor::Executor,
+    rpc::Server,
     AppState,
 };
-
 use ::rpc::Peer;
 use anyhow::anyhow;
 use call::{room, ActiveCall, ParticipantLocation, Room};
@@ -17,7 +17,7 @@ use editor::{
     ToggleCodeActions, Undo,
 };
 use fs::{FakeFs, Fs as _, HomeDir, LineEnding};
-use futures::{channel::oneshot, Future, StreamExt as _};
+use futures::{channel::oneshot, StreamExt as _};
 use gpui::{
     executor::{self, Deterministic},
     geometry::vector::vec2f,
@@ -45,7 +45,6 @@ use std::{
         atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
         Arc,
     },
-    time::Duration,
 };
 use theme::ThemeRegistry;
 use unindent::Unindent as _;
@@ -417,7 +416,7 @@ async fn test_leaving_room_on_disconnection(
 
     // When user A disconnects, both client A and B clear their room on the active call.
     server.disconnect_client(client_a.peer_id().unwrap());
-    cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
+    deterministic.advance_clock(rpc::RECEIVE_TIMEOUT);
     active_call_a.read_with(cx_a, |call, _| assert!(call.room().is_none()));
     active_call_b.read_with(cx_b, |call, _| assert!(call.room().is_none()));
     assert_eq!(
@@ -6000,7 +5999,7 @@ impl TestServer {
                                 client_name,
                                 user,
                                 Some(connection_id_tx),
-                                cx.background(),
+                                Executor::Deterministic(cx.background()),
                             ))
                             .detach();
                         let connection_id = connection_id_rx.await.unwrap();
@@ -6829,18 +6828,6 @@ impl Drop for TestClient {
     }
 }
 
-impl Executor for Arc<gpui::executor::Background> {
-    type Sleep = gpui::executor::Timer;
-
-    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
-        self.spawn(future).detach();
-    }
-
-    fn sleep(&self, duration: Duration) -> Self::Sleep {
-        self.as_ref().timer(duration)
-    }
-}
-
 #[derive(Debug, Eq, PartialEq)]
 struct RoomParticipants {
     remote: Vec<String>,

crates/collab/src/lib.rs 🔗

@@ -2,6 +2,7 @@ pub mod api;
 pub mod auth;
 pub mod db;
 pub mod env;
+mod executor;
 #[cfg(test)]
 mod integration_tests;
 pub mod rpc;

crates/collab/src/rpc.rs 🔗

@@ -3,6 +3,7 @@ mod connection_pool;
 use crate::{
     auth,
     db::{self, Database, ProjectId, RoomId, User, UserId},
+    executor::Executor,
     AppState, Result,
 };
 use anyhow::anyhow;
@@ -50,12 +51,8 @@ use std::{
         atomic::{AtomicBool, Ordering::SeqCst},
         Arc,
     },
-    time::Duration,
-};
-use tokio::{
-    sync::{Mutex, MutexGuard},
-    time::Sleep,
 };
+use tokio::sync::{Mutex, MutexGuard};
 use tower::ServiceBuilder;
 use tracing::{info_span, instrument, Instrument};
 
@@ -145,15 +142,6 @@ pub struct Server {
     handlers: HashMap<TypeId, MessageHandler>,
 }
 
-pub trait Executor: Send + Clone {
-    type Sleep: Send + Future;
-    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
-    fn sleep(&self, duration: Duration) -> Self::Sleep;
-}
-
-#[derive(Clone)]
-pub struct RealExecutor;
-
 pub(crate) struct ConnectionPoolGuard<'a> {
     guard: MutexGuard<'a, ConnectionPool>,
     _not_send: PhantomData<Rc<()>>,
@@ -330,13 +318,13 @@ impl Server {
         })
     }
 
-    pub fn handle_connection<E: Executor>(
+    pub fn handle_connection(
         self: &Arc<Self>,
         connection: Connection,
         address: String,
         user: User,
         mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
-        executor: E,
+        executor: Executor,
     ) -> impl Future<Output = Result<()>> {
         let this = self.clone();
         let user_id = user.id;
@@ -347,12 +335,7 @@ impl Server {
                 .peer
                 .add_connection(connection, {
                     let executor = executor.clone();
-                    move |duration| {
-                        let timer = executor.sleep(duration);
-                        async move {
-                            timer.await;
-                        }
-                    }
+                    move |duration| executor.sleep(duration)
                 });
 
             tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
@@ -543,18 +526,6 @@ impl<'a> Drop for ConnectionPoolGuard<'a> {
     }
 }
 
-impl Executor for RealExecutor {
-    type Sleep = Sleep;
-
-    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
-        tokio::task::spawn(future);
-    }
-
-    fn sleep(&self, duration: Duration) -> Self::Sleep {
-        tokio::time::sleep(duration)
-    }
-}
-
 fn broadcast<F>(
     sender_id: ConnectionId,
     receiver_ids: impl IntoIterator<Item = ConnectionId>,
@@ -636,7 +607,7 @@ pub async fn handle_websocket_request(
         let connection = Connection::new(Box::pin(socket));
         async move {
             server
-                .handle_connection(connection, socket_address, user, None, RealExecutor)
+                .handle_connection(connection, socket_address, user, None, Executor::Production)
                 .await
                 .log_err();
         }