updated notify to accomodate for updated countdown

KCaverly created

Change summary

crates/search/src/project_search.rs         |  2 
crates/semantic_index/src/embedding.rs      | 42 ++++++++++------------
crates/semantic_index/src/semantic_index.rs | 17 +++++++-
3 files changed, 35 insertions(+), 26 deletions(-)

Detailed changes

crates/search/src/project_search.rs 🔗

@@ -332,7 +332,7 @@ impl View for ProjectSearchView {
                                     rate_limit_expiration_time.duration_since(SystemTime::now())
                                 {
                                     Some(format!(
-                                        "Remaining files to index(rate limit resets in {}s): {}",
+                                        "Remaining files to index (rate limit resets in {}s): {}",
                                         remaining_seconds.as_secs(),
                                         remaining_files
                                     ))

crates/semantic_index/src/embedding.rs 🔗

@@ -85,8 +85,8 @@ impl ToSql for Embedding {
 pub struct OpenAIEmbeddings {
     pub client: Arc<dyn HttpClient>,
     pub executor: Arc<Background>,
-    rate_limit_count_rx: watch::Receiver<(Option<SystemTime>, usize)>,
-    rate_limit_count_tx: Arc<Mutex<watch::Sender<(Option<SystemTime>, usize)>>>,
+    rate_limit_count_rx: watch::Receiver<Option<SystemTime>>,
+    rate_limit_count_tx: Arc<Mutex<watch::Sender<Option<SystemTime>>>>,
 }
 
 #[derive(Serialize)]
@@ -159,7 +159,7 @@ const OPENAI_INPUT_LIMIT: usize = 8190;
 
 impl OpenAIEmbeddings {
     pub fn new(client: Arc<dyn HttpClient>, executor: Arc<Background>) -> Self {
-        let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((None, 0));
+        let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with(None);
         let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx));
 
         OpenAIEmbeddings {
@@ -171,18 +171,22 @@ impl OpenAIEmbeddings {
     }
 
     fn resolve_rate_limit(&self) {
-        let (reset_time, delay_count) = *self.rate_limit_count_tx.lock().borrow();
-        let updated_count = delay_count - 1;
-        let updated_time = if updated_count == 0 { None } else { reset_time };
+        let reset_time = *self.rate_limit_count_tx.lock().borrow();
 
-        log::trace!("resolving rate limit: Count: {:?}", updated_count);
+        if let Some(reset_time) = reset_time {
+            if SystemTime::now() >= reset_time {
+                *self.rate_limit_count_tx.lock().borrow_mut() = None
+            }
+        }
 
-        *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count);
+        log::trace!(
+            "resolving reset time: {:?}",
+            *self.rate_limit_count_tx.lock().borrow()
+        );
     }
 
-    fn update_rate_limit(&self, reset_time: SystemTime, count_increase: usize) {
-        let (original_time, original_count) = *self.rate_limit_count_tx.lock().borrow();
-        let updated_count = original_count + count_increase;
+    fn update_reset_time(&self, reset_time: SystemTime) {
+        let original_time = *self.rate_limit_count_tx.lock().borrow();
 
         let updated_time = if let Some(original_time) = original_time {
             if reset_time < original_time {
@@ -194,9 +198,9 @@ impl OpenAIEmbeddings {
             Some(reset_time)
         };
 
-        log::trace!("updating rate limit: Count: {:?}", updated_count);
+        log::trace!("updating rate limit time: {:?}", updated_time);
 
-        *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count);
+        *self.rate_limit_count_tx.lock().borrow_mut() = updated_time;
     }
     async fn send_request(
         &self,
@@ -229,8 +233,7 @@ impl EmbeddingProvider for OpenAIEmbeddings {
     }
 
     fn rate_limit_expiration(&self) -> Option<SystemTime> {
-        let (expiration_time, _) = *self.rate_limit_count_rx.borrow();
-        expiration_time
+        *self.rate_limit_count_rx.borrow()
     }
     fn truncate(&self, span: &str) -> (String, usize) {
         let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span);
@@ -296,6 +299,7 @@ impl EmbeddingProvider for OpenAIEmbeddings {
                         .collect());
                 }
                 StatusCode::TOO_MANY_REQUESTS => {
+                    rate_limiting = true;
                     let mut body = String::new();
                     response.body_mut().read_to_string(&mut body).await?;
 
@@ -316,13 +320,7 @@ impl EmbeddingProvider for OpenAIEmbeddings {
 
                     // If we've previously rate limited, increment the duration but not the count
                     let reset_time = SystemTime::now().add(delay_duration);
-                    if rate_limiting {
-                        self.update_rate_limit(reset_time, 0);
-                    } else {
-                        self.update_rate_limit(reset_time, 1);
-                    }
-
-                    rate_limiting = true;
+                    self.update_reset_time(reset_time);
 
                     log::trace!(
                         "openai rate limiting: waiting {:?} until lifted",

crates/semantic_index/src/semantic_index.rs 🔗

@@ -232,9 +232,20 @@ impl ProjectState {
             _observe_pending_file_count: cx.spawn_weak({
                 let mut pending_file_count_rx = pending_file_count_rx.clone();
                 |this, mut cx| async move {
-                    while let Some(_) = pending_file_count_rx.next().await {
-                        if let Some(this) = this.upgrade(&cx) {
-                            this.update(&mut cx, |_, cx| cx.notify());
+                    loop {
+                        let mut timer = cx.background().timer(Duration::from_millis(350)).fuse();
+                        let mut pending_file_count = pending_file_count_rx.next().fuse();
+                        futures::select_biased! {
+                            _ = pending_file_count => {
+                                if let Some(this) = this.upgrade(&cx) {
+                                    this.update(&mut cx, |_, cx| cx.notify());
+                                }
+                            },
+                            _ = timer => {
+                                if let Some(this) = this.upgrade(&cx) {
+                                    this.update(&mut cx, |_, cx| cx.notify());
+                                }
+                            }
                         }
                     }
                 }