Detailed changes
@@ -732,10 +732,6 @@ impl EditAgent {
stop: Vec::new(),
temperature: None,
thinking_allowed: true,
- // Bypass the rate limiter for nested requests (edit agent requests spawned
- // from within a tool call) to avoid deadlocks when multiple subagents try
- // to use edit_file simultaneously.
- bypass_rate_limit: true,
};
Ok(self.model.stream_completion_text(request, cx).await?.stream)
@@ -1663,6 +1663,13 @@ impl Thread {
}
}
+ // Drop the stream to release the rate limit permit before tool execution.
+ // The stream holds a semaphore guard that limits concurrent requests.
+ // Without this, the permit would be held during potentially long-running
+ // tool execution, which could cause deadlocks when tools spawn subagents
+ // that need their own permits.
+ drop(events);
+
let end_turn = tool_results.is_empty();
while let Some(tool_result) = tool_results.next().await {
log::debug!("Tool finished {:?}", tool_result);
@@ -2263,7 +2270,6 @@ impl Thread {
stop: Vec::new(),
temperature: AgentSettings::temperature_for_model(model, cx),
thinking_allowed: self.thinking_enabled,
- bypass_rate_limit: false,
};
log::debug!("Completion request built successfully");
@@ -544,7 +544,6 @@ impl CodegenAlternative {
temperature,
messages,
thinking_allowed: false,
- bypass_rate_limit: false,
}
}))
}
@@ -623,7 +622,6 @@ impl CodegenAlternative {
temperature,
messages: vec![request_message],
thinking_allowed: false,
- bypass_rate_limit: false,
}
}))
}
@@ -275,7 +275,6 @@ impl TerminalInlineAssistant {
stop: Vec::new(),
temperature,
thinking_allowed: false,
- bypass_rate_limit: false,
}
}))
}
@@ -2269,7 +2269,6 @@ impl TextThread {
stop: Vec::new(),
temperature: model.and_then(|model| AgentSettings::temperature_for_model(model, cx)),
thinking_allowed: true,
- bypass_rate_limit: false,
};
for message in self.messages(cx) {
if message.status != MessageStatus::Done {
@@ -563,7 +563,6 @@ impl ExampleInstance {
tool_choice: None,
stop: Vec::new(),
thinking_allowed: true,
- bypass_rate_limit: false,
};
let model = model.clone();
@@ -2691,7 +2691,6 @@ impl GitPanel {
stop: Vec::new(),
temperature,
thinking_allowed: false,
- bypass_rate_limit: false,
};
let stream = model.stream_completion_text(request, cx);
@@ -16,7 +16,7 @@ pub struct RateLimiter {
pub struct RateLimitGuard<T> {
inner: T,
- _guard: Option<SemaphoreGuardArc>,
+ _guard: SemaphoreGuardArc,
}
impl<T> Stream for RateLimitGuard<T>
@@ -68,36 +68,6 @@ impl RateLimiter {
async move {
let guard = guard.await;
let inner = future.await?;
- Ok(RateLimitGuard {
- inner,
- _guard: Some(guard),
- })
- }
- }
-
- /// Like `stream`, but conditionally bypasses the rate limiter based on the flag.
- /// Used for nested requests (like edit agent requests) that are already "part of"
- /// a rate-limited request to avoid deadlocks.
- pub fn stream_with_bypass<'a, Fut, T>(
- &self,
- future: Fut,
- bypass: bool,
- ) -> impl 'a
- + Future<
- Output = Result<impl Stream<Item = T::Item> + use<Fut, T>, LanguageModelCompletionError>,
- >
- where
- Fut: 'a + Future<Output = Result<T, LanguageModelCompletionError>>,
- T: Stream,
- {
- let semaphore = self.semaphore.clone();
- async move {
- let guard = if bypass {
- None
- } else {
- Some(semaphore.acquire_arc().await)
- };
- let inner = future.await?;
Ok(RateLimitGuard {
inner,
_guard: guard,
@@ -105,190 +75,3 @@ impl RateLimiter {
}
}
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use futures::stream;
- use smol::lock::Barrier;
- use std::sync::Arc;
- use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
- use std::time::{Duration, Instant};
-
- /// Tests that nested requests without bypass_rate_limit cause deadlock,
- /// while requests with bypass_rate_limit complete successfully.
- ///
- /// This test simulates the scenario where multiple "parent" requests each
- /// try to spawn a "nested" request (like edit_file tool spawning an edit agent).
- /// With a rate limit of 2 and 2 parent requests, without bypass the nested
- /// requests would block forever waiting for permits that the parents hold.
- #[test]
- fn test_nested_requests_bypass_prevents_deadlock() {
- smol::block_on(async {
- // Use only 2 permits so we can guarantee deadlock conditions
- let rate_limiter = RateLimiter::new(2);
- let completed = Arc::new(AtomicUsize::new(0));
- // Barrier ensures all parents acquire permits before any tries nested request
- let barrier = Arc::new(Barrier::new(2));
-
- // Spawn 2 "parent" requests that each try to make a "nested" request
- let mut handles = Vec::new();
- for _ in 0..2 {
- let limiter = rate_limiter.clone();
- let completed = completed.clone();
- let barrier = barrier.clone();
-
- let handle = smol::spawn(async move {
- // Parent request acquires a permit via stream_with_bypass (bypass=false)
- let parent_stream = limiter
- .stream_with_bypass(
- async {
- // Wait for all parents to acquire permits
- barrier.wait().await;
-
- // While holding the parent permit, make a nested request
- // WITH bypass=true (simulating EditAgent behavior)
- let nested_stream = limiter
- .stream_with_bypass(
- async { Ok(stream::iter(vec![1, 2, 3])) },
- true, // bypass - this is the key!
- )
- .await?;
-
- // Consume the nested stream
- use futures::StreamExt;
- let _: Vec<_> = nested_stream.collect().await;
-
- Ok(stream::iter(vec!["done"]))
- },
- false, // parent does NOT bypass
- )
- .await
- .unwrap();
-
- // Consume parent stream
- use futures::StreamExt;
- let _: Vec<_> = parent_stream.collect().await;
-
- completed.fetch_add(1, Ordering::SeqCst);
- });
- handles.push(handle);
- }
-
- // With bypass=true for nested requests, this should complete quickly
- let timed_out = Arc::new(AtomicBool::new(false));
- let timed_out_clone = timed_out.clone();
-
- // Spawn a watchdog that sets timed_out after 2 seconds
- let watchdog = smol::spawn(async move {
- let start = Instant::now();
- while start.elapsed() < Duration::from_secs(2) {
- smol::future::yield_now().await;
- }
- timed_out_clone.store(true, Ordering::SeqCst);
- });
-
- // Wait for all handles to complete
- for handle in handles {
- handle.await;
- }
-
- // Cancel the watchdog
- drop(watchdog);
-
- if timed_out.load(Ordering::SeqCst) {
- panic!(
- "Test timed out - deadlock detected! This means bypass_rate_limit is not working."
- );
- }
- assert_eq!(completed.load(Ordering::SeqCst), 2);
- });
- }
-
- /// Tests that without bypass, nested requests DO cause deadlock.
- /// This test verifies the problem exists when bypass is not used.
- #[test]
- fn test_nested_requests_without_bypass_deadlocks() {
- smol::block_on(async {
- // Use only 2 permits so we can guarantee deadlock conditions
- let rate_limiter = RateLimiter::new(2);
- let completed = Arc::new(AtomicUsize::new(0));
- // Barrier ensures all parents acquire permits before any tries nested request
- let barrier = Arc::new(Barrier::new(2));
-
- // Spawn 2 "parent" requests that each try to make a "nested" request
- let mut handles = Vec::new();
- for _ in 0..2 {
- let limiter = rate_limiter.clone();
- let completed = completed.clone();
- let barrier = barrier.clone();
-
- let handle = smol::spawn(async move {
- // Parent request acquires a permit
- let parent_stream = limiter
- .stream_with_bypass(
- async {
- // Wait for all parents to acquire permits - this guarantees
- // that all 2 permits are held before any nested request starts
- barrier.wait().await;
-
- // Nested request WITHOUT bypass - this will deadlock!
- // Both parents hold permits, so no permits available
- let nested_stream = limiter
- .stream_with_bypass(
- async { Ok(stream::iter(vec![1, 2, 3])) },
- false, // NO bypass - will try to acquire permit
- )
- .await?;
-
- use futures::StreamExt;
- let _: Vec<_> = nested_stream.collect().await;
-
- Ok(stream::iter(vec!["done"]))
- },
- false,
- )
- .await
- .unwrap();
-
- use futures::StreamExt;
- let _: Vec<_> = parent_stream.collect().await;
-
- completed.fetch_add(1, Ordering::SeqCst);
- });
- handles.push(handle);
- }
-
- // This SHOULD timeout because of deadlock (both parents hold permits,
- // both nested requests wait for permits)
- let timed_out = Arc::new(AtomicBool::new(false));
- let timed_out_clone = timed_out.clone();
-
- // Spawn a watchdog that sets timed_out after 100ms
- let watchdog = smol::spawn(async move {
- let start = Instant::now();
- while start.elapsed() < Duration::from_millis(100) {
- smol::future::yield_now().await;
- }
- timed_out_clone.store(true, Ordering::SeqCst);
- });
-
- // Poll briefly to let everything run
- let start = Instant::now();
- while start.elapsed() < Duration::from_millis(100) {
- smol::future::yield_now().await;
- }
-
- // Cancel the watchdog
- drop(watchdog);
-
- // Expected - deadlock occurred, which proves the bypass is necessary
- let count = completed.load(Ordering::SeqCst);
- assert_eq!(
- count, 0,
- "Expected complete deadlock (0 completed) but {} requests completed",
- count
- );
- });
- }
-}
@@ -451,11 +451,6 @@ pub struct LanguageModelRequest {
pub stop: Vec<String>,
pub temperature: Option<f32>,
pub thinking_allowed: bool,
- /// When true, this request bypasses the rate limiter. Used for nested requests
- /// (like edit agent requests spawned from within a tool call) that are already
- /// "part of" a rate-limited request to avoid deadlocks.
- #[serde(default)]
- pub bypass_rate_limit: bool,
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
@@ -578,7 +578,6 @@ impl LanguageModel for AnthropicModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = into_anthropic(
request,
self.model.request_id().into(),
@@ -587,13 +586,10 @@ impl LanguageModel for AnthropicModel {
self.model.mode(),
);
let request = self.stream_completion(request, cx);
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let response = request.await?;
- Ok(AnthropicEventMapper::new().map_stream(response))
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let response = request.await?;
+ Ok(AnthropicEventMapper::new().map_stream(response))
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -1168,7 +1164,6 @@ mod tests {
tools: vec![],
tool_choice: None,
thinking_allowed: true,
- bypass_rate_limit: false,
};
let anthropic_request = into_anthropic(
@@ -679,7 +679,6 @@ impl LanguageModel for BedrockModel {
};
let deny_tool_calls = request.tool_choice == Some(LanguageModelToolChoice::None);
- let bypass_rate_limit = request.bypass_rate_limit;
let request = match into_bedrock(
request,
@@ -694,19 +693,16 @@ impl LanguageModel for BedrockModel {
};
let request = self.stream_completion(request, cx);
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let response = request.await.map_err(|err| anyhow!(err))?;
- let events = map_to_language_model_completion_events(response);
+ let future = self.request_limiter.stream(async move {
+ let response = request.await.map_err(|err| anyhow!(err))?;
+ let events = map_to_language_model_completion_events(response);
- if deny_tool_calls {
- Ok(deny_tool_use_events(events).boxed())
- } else {
- Ok(events.boxed())
- }
- },
- bypass_rate_limit,
- );
+ if deny_tool_calls {
+ Ok(deny_tool_use_events(events).boxed())
+ } else {
+ Ok(events.boxed())
+ }
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -730,7 +730,6 @@ impl LanguageModel for CloudLanguageModel {
let thread_id = request.thread_id.clone();
let prompt_id = request.prompt_id.clone();
let intent = request.intent;
- let bypass_rate_limit = request.bypass_rate_limit;
let app_version = Some(cx.update(|cx| AppVersion::global(cx)));
let thinking_allowed = request.thinking_allowed;
let is_thinking_toggle_enabled =
@@ -758,42 +757,37 @@ impl LanguageModel for CloudLanguageModel {
);
let client = self.client.clone();
let llm_api_token = self.llm_api_token.clone();
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let PerformLlmCompletionResponse {
- response,
- includes_status_messages,
- } = Self::perform_llm_completion(
- client.clone(),
- llm_api_token,
- app_version,
- CompletionBody {
- thread_id,
- prompt_id,
- intent,
- provider: cloud_llm_client::LanguageModelProvider::Anthropic,
- model: request.model.clone(),
- provider_request: serde_json::to_value(&request)
- .map_err(|e| anyhow!(e))?,
- },
- )
- .await
- .map_err(|err| {
- match err.downcast::<ApiError>() {
- Ok(api_err) => anyhow!(LanguageModelCompletionError::from(api_err)),
- Err(err) => anyhow!(err),
- }
- })?;
-
- let mut mapper = AnthropicEventMapper::new();
- Ok(map_cloud_completion_events(
- Box::pin(response_lines(response, includes_status_messages)),
- &provider_name,
- move |event| mapper.map_event(event),
- ))
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let PerformLlmCompletionResponse {
+ response,
+ includes_status_messages,
+ } = Self::perform_llm_completion(
+ client.clone(),
+ llm_api_token,
+ app_version,
+ CompletionBody {
+ thread_id,
+ prompt_id,
+ intent,
+ provider: cloud_llm_client::LanguageModelProvider::Anthropic,
+ model: request.model.clone(),
+ provider_request: serde_json::to_value(&request)
+ .map_err(|e| anyhow!(e))?,
+ },
+ )
+ .await
+ .map_err(|err| match err.downcast::<ApiError>() {
+ Ok(api_err) => anyhow!(LanguageModelCompletionError::from(api_err)),
+ Err(err) => anyhow!(err),
+ })?;
+
+ let mut mapper = AnthropicEventMapper::new();
+ Ok(map_cloud_completion_events(
+ Box::pin(response_lines(response, includes_status_messages)),
+ &provider_name,
+ move |event| mapper.map_event(event),
+ ))
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
cloud_llm_client::LanguageModelProvider::OpenAi => {
@@ -808,36 +802,33 @@ impl LanguageModel for CloudLanguageModel {
None,
None,
);
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let PerformLlmCompletionResponse {
- response,
- includes_status_messages,
- } = Self::perform_llm_completion(
- client.clone(),
- llm_api_token,
- app_version,
- CompletionBody {
- thread_id,
- prompt_id,
- intent,
- provider: cloud_llm_client::LanguageModelProvider::OpenAi,
- model: request.model.clone(),
- provider_request: serde_json::to_value(&request)
- .map_err(|e| anyhow!(e))?,
- },
- )
- .await?;
-
- let mut mapper = OpenAiResponseEventMapper::new();
- Ok(map_cloud_completion_events(
- Box::pin(response_lines(response, includes_status_messages)),
- &provider_name,
- move |event| mapper.map_event(event),
- ))
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let PerformLlmCompletionResponse {
+ response,
+ includes_status_messages,
+ } = Self::perform_llm_completion(
+ client.clone(),
+ llm_api_token,
+ app_version,
+ CompletionBody {
+ thread_id,
+ prompt_id,
+ intent,
+ provider: cloud_llm_client::LanguageModelProvider::OpenAi,
+ model: request.model.clone(),
+ provider_request: serde_json::to_value(&request)
+ .map_err(|e| anyhow!(e))?,
+ },
+ )
+ .await?;
+
+ let mut mapper = OpenAiResponseEventMapper::new();
+ Ok(map_cloud_completion_events(
+ Box::pin(response_lines(response, includes_status_messages)),
+ &provider_name,
+ move |event| mapper.map_event(event),
+ ))
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
cloud_llm_client::LanguageModelProvider::XAi => {
@@ -851,36 +842,33 @@ impl LanguageModel for CloudLanguageModel {
None,
);
let llm_api_token = self.llm_api_token.clone();
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let PerformLlmCompletionResponse {
- response,
- includes_status_messages,
- } = Self::perform_llm_completion(
- client.clone(),
- llm_api_token,
- app_version,
- CompletionBody {
- thread_id,
- prompt_id,
- intent,
- provider: cloud_llm_client::LanguageModelProvider::XAi,
- model: request.model.clone(),
- provider_request: serde_json::to_value(&request)
- .map_err(|e| anyhow!(e))?,
- },
- )
- .await?;
-
- let mut mapper = OpenAiEventMapper::new();
- Ok(map_cloud_completion_events(
- Box::pin(response_lines(response, includes_status_messages)),
- &provider_name,
- move |event| mapper.map_event(event),
- ))
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let PerformLlmCompletionResponse {
+ response,
+ includes_status_messages,
+ } = Self::perform_llm_completion(
+ client.clone(),
+ llm_api_token,
+ app_version,
+ CompletionBody {
+ thread_id,
+ prompt_id,
+ intent,
+ provider: cloud_llm_client::LanguageModelProvider::XAi,
+ model: request.model.clone(),
+ provider_request: serde_json::to_value(&request)
+ .map_err(|e| anyhow!(e))?,
+ },
+ )
+ .await?;
+
+ let mut mapper = OpenAiEventMapper::new();
+ Ok(map_cloud_completion_events(
+ Box::pin(response_lines(response, includes_status_messages)),
+ &provider_name,
+ move |event| mapper.map_event(event),
+ ))
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
cloud_llm_client::LanguageModelProvider::Google => {
@@ -888,36 +876,33 @@ impl LanguageModel for CloudLanguageModel {
let request =
into_google(request, self.model.id.to_string(), GoogleModelMode::Default);
let llm_api_token = self.llm_api_token.clone();
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let PerformLlmCompletionResponse {
- response,
- includes_status_messages,
- } = Self::perform_llm_completion(
- client.clone(),
- llm_api_token,
- app_version,
- CompletionBody {
- thread_id,
- prompt_id,
- intent,
- provider: cloud_llm_client::LanguageModelProvider::Google,
- model: request.model.model_id.clone(),
- provider_request: serde_json::to_value(&request)
- .map_err(|e| anyhow!(e))?,
- },
- )
- .await?;
-
- let mut mapper = GoogleEventMapper::new();
- Ok(map_cloud_completion_events(
- Box::pin(response_lines(response, includes_status_messages)),
- &provider_name,
- move |event| mapper.map_event(event),
- ))
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let PerformLlmCompletionResponse {
+ response,
+ includes_status_messages,
+ } = Self::perform_llm_completion(
+ client.clone(),
+ llm_api_token,
+ app_version,
+ CompletionBody {
+ thread_id,
+ prompt_id,
+ intent,
+ provider: cloud_llm_client::LanguageModelProvider::Google,
+ model: request.model.model_id.clone(),
+ provider_request: serde_json::to_value(&request)
+ .map_err(|e| anyhow!(e))?,
+ },
+ )
+ .await?;
+
+ let mut mapper = GoogleEventMapper::new();
+ Ok(map_cloud_completion_events(
+ Box::pin(response_lines(response, includes_status_messages)),
+ &provider_name,
+ move |event| mapper.map_event(event),
+ ))
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
}
@@ -307,7 +307,6 @@ impl LanguageModel for CopilotChatLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let is_user_initiated = request.intent.is_none_or(|intent| match intent {
CompletionIntent::UserPrompt
| CompletionIntent::ThreadContextSummarization
@@ -328,14 +327,11 @@ impl LanguageModel for CopilotChatLanguageModel {
let request =
CopilotChat::stream_response(responses_request, is_user_initiated, cx.clone());
request_limiter
- .stream_with_bypass(
- async move {
- let stream = request.await?;
- let mapper = CopilotResponsesEventMapper::new();
- Ok(mapper.map_stream(stream).boxed())
- },
- bypass_rate_limit,
- )
+ .stream(async move {
+ let stream = request.await?;
+ let mapper = CopilotResponsesEventMapper::new();
+ Ok(mapper.map_stream(stream).boxed())
+ })
.await
});
return async move { Ok(future.await?.boxed()) }.boxed();
@@ -352,16 +348,13 @@ impl LanguageModel for CopilotChatLanguageModel {
let request =
CopilotChat::stream_completion(copilot_request, is_user_initiated, cx.clone());
request_limiter
- .stream_with_bypass(
- async move {
- let response = request.await?;
- Ok(map_to_language_model_completion_events(
- response,
- is_streaming,
- ))
- },
- bypass_rate_limit,
- )
+ .stream(async move {
+ let response = request.await?;
+ Ok(map_to_language_model_completion_events(
+ response,
+ is_streaming,
+ ))
+ })
.await
});
async move { Ok(future.await?.boxed()) }.boxed()
@@ -936,7 +929,6 @@ fn into_copilot_responses(
stop: _,
temperature,
thinking_allowed: _,
- bypass_rate_limit: _,
} = request;
let mut input_items: Vec<responses::ResponseInputItem> = Vec::new();
@@ -199,7 +199,6 @@ impl DeepSeekLanguageModel {
fn stream_completion(
&self,
request: deepseek::Request,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<'static, Result<BoxStream<'static, Result<deepseek::StreamResponse>>>> {
let http_client = self.http_client.clone();
@@ -209,20 +208,17 @@ impl DeepSeekLanguageModel {
(state.api_key_state.key(&api_url), api_url)
});
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey {
- provider: PROVIDER_NAME,
- });
- };
- let request =
- deepseek::stream_completion(http_client.as_ref(), &api_url, &api_key, request);
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey {
+ provider: PROVIDER_NAME,
+ });
+ };
+ let request =
+ deepseek::stream_completion(http_client.as_ref(), &api_url, &api_key, request);
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -306,9 +302,8 @@ impl LanguageModel for DeepSeekLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = into_deepseek(request, &self.model, self.max_output_tokens());
- let stream = self.stream_completion(request, bypass_rate_limit, cx);
+ let stream = self.stream_completion(request, cx);
async move {
let mapper = DeepSeekEventMapper::new();
@@ -370,20 +370,16 @@ impl LanguageModel for GoogleLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = into_google(
request,
self.model.request_id().to_string(),
self.model.mode(),
);
let request = self.stream_completion(request, cx);
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let response = request.await.map_err(LanguageModelCompletionError::from)?;
- Ok(GoogleEventMapper::new().map_stream(response))
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let response = request.await.map_err(LanguageModelCompletionError::from)?;
+ Ok(GoogleEventMapper::new().map_stream(response))
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
}
@@ -370,7 +370,6 @@ impl LmStudioLanguageModel {
fn stream_completion(
&self,
request: lmstudio::ChatCompletionRequest,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<
'static,
@@ -382,15 +381,11 @@ impl LmStudioLanguageModel {
settings.api_url.clone()
});
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let request =
- lmstudio::stream_chat_completion(http_client.as_ref(), &api_url, request);
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let request = lmstudio::stream_chat_completion(http_client.as_ref(), &api_url, request);
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -465,9 +460,8 @@ impl LanguageModel for LmStudioLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = self.to_lmstudio_request(request);
- let completions = self.stream_completion(request, bypass_rate_limit, cx);
+ let completions = self.stream_completion(request, cx);
async move {
let mapper = LmStudioEventMapper::new();
Ok(mapper.map_stream(completions.await?).boxed())
@@ -264,7 +264,6 @@ impl MistralLanguageModel {
fn stream_completion(
&self,
request: mistral::Request,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<
'static,
@@ -277,20 +276,17 @@ impl MistralLanguageModel {
(state.api_key_state.key(&api_url), api_url)
});
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey {
- provider: PROVIDER_NAME,
- });
- };
- let request =
- mistral::stream_completion(http_client.as_ref(), &api_url, &api_key, request);
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey {
+ provider: PROVIDER_NAME,
+ });
+ };
+ let request =
+ mistral::stream_completion(http_client.as_ref(), &api_url, &api_key, request);
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -374,9 +370,8 @@ impl LanguageModel for MistralLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = into_mistral(request, self.model.clone(), self.max_output_tokens());
- let stream = self.stream_completion(request, bypass_rate_limit, cx);
+ let stream = self.stream_completion(request, cx);
async move {
let stream = stream.await?;
@@ -906,7 +901,6 @@ mod tests {
intent: None,
stop: vec![],
thinking_allowed: true,
- bypass_rate_limit: false,
};
let mistral_request = into_mistral(request, mistral::Model::MistralSmallLatest, None);
@@ -940,7 +934,6 @@ mod tests {
intent: None,
stop: vec![],
thinking_allowed: true,
- bypass_rate_limit: false,
};
let mistral_request = into_mistral(request, mistral::Model::Pixtral12BLatest, None);
@@ -480,7 +480,6 @@ impl LanguageModel for OllamaLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = self.to_ollama_request(request);
let http_client = self.http_client.clone();
@@ -489,20 +488,13 @@ impl LanguageModel for OllamaLanguageModel {
(state.api_key_state.key(&api_url), api_url)
});
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let stream = stream_chat_completion(
- http_client.as_ref(),
- &api_url,
- api_key.as_deref(),
- request,
- )
- .await?;
- let stream = map_to_language_model_completion_events(stream);
- Ok(stream)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let stream =
+ stream_chat_completion(http_client.as_ref(), &api_url, api_key.as_deref(), request)
+ .await?;
+ let stream = map_to_language_model_completion_events(stream);
+ Ok(stream)
+ });
future.map_ok(|f| f.boxed()).boxed()
}
@@ -213,7 +213,6 @@ impl OpenAiLanguageModel {
fn stream_completion(
&self,
request: open_ai::Request,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<'static, Result<futures::stream::BoxStream<'static, Result<ResponseStreamEvent>>>>
{
@@ -224,24 +223,21 @@ impl OpenAiLanguageModel {
(state.api_key_state.key(&api_url), api_url)
});
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let provider = PROVIDER_NAME;
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey { provider });
- };
- let request = stream_completion(
- http_client.as_ref(),
- provider.0.as_str(),
- &api_url,
- &api_key,
- request,
- );
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let provider = PROVIDER_NAME;
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey { provider });
+ };
+ let request = stream_completion(
+ http_client.as_ref(),
+ provider.0.as_str(),
+ &api_url,
+ &api_key,
+ request,
+ );
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -249,7 +245,6 @@ impl OpenAiLanguageModel {
fn stream_response(
&self,
request: ResponseRequest,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<'static, Result<futures::stream::BoxStream<'static, Result<ResponsesStreamEvent>>>>
{
@@ -261,23 +256,20 @@ impl OpenAiLanguageModel {
});
let provider = PROVIDER_NAME;
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey { provider });
- };
- let request = stream_response(
- http_client.as_ref(),
- provider.0.as_str(),
- &api_url,
- &api_key,
- request,
- );
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey { provider });
+ };
+ let request = stream_response(
+ http_client.as_ref(),
+ provider.0.as_str(),
+ &api_url,
+ &api_key,
+ request,
+ );
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -376,7 +368,6 @@ impl LanguageModel for OpenAiLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
if self.model.supports_chat_completions() {
let request = into_open_ai(
request,
@@ -386,7 +377,7 @@ impl LanguageModel for OpenAiLanguageModel {
self.max_output_tokens(),
self.model.reasoning_effort(),
);
- let completions = self.stream_completion(request, bypass_rate_limit, cx);
+ let completions = self.stream_completion(request, cx);
async move {
let mapper = OpenAiEventMapper::new();
Ok(mapper.map_stream(completions.await?).boxed())
@@ -401,7 +392,7 @@ impl LanguageModel for OpenAiLanguageModel {
self.max_output_tokens(),
self.model.reasoning_effort(),
);
- let completions = self.stream_response(request, bypass_rate_limit, cx);
+ let completions = self.stream_response(request, cx);
async move {
let mapper = OpenAiResponseEventMapper::new();
Ok(mapper.map_stream(completions.await?).boxed())
@@ -561,7 +552,6 @@ pub fn into_open_ai_response(
stop: _,
temperature,
thinking_allowed: _,
- bypass_rate_limit: _,
} = request;
let mut input_items = Vec::new();
@@ -1444,7 +1434,6 @@ mod tests {
stop: vec![],
temperature: None,
thinking_allowed: true,
- bypass_rate_limit: false,
};
// Validate that all models are supported by tiktoken-rs
@@ -1581,7 +1570,6 @@ mod tests {
stop: vec!["<STOP>".into()],
temperature: None,
thinking_allowed: false,
- bypass_rate_limit: false,
};
let response = into_open_ai_response(
@@ -204,7 +204,6 @@ impl OpenAiCompatibleLanguageModel {
fn stream_completion(
&self,
request: open_ai::Request,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<
'static,
@@ -224,23 +223,20 @@ impl OpenAiCompatibleLanguageModel {
});
let provider = self.provider_name.clone();
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey { provider });
- };
- let request = stream_completion(
- http_client.as_ref(),
- provider.0.as_str(),
- &api_url,
- &api_key,
- request,
- );
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey { provider });
+ };
+ let request = stream_completion(
+ http_client.as_ref(),
+ provider.0.as_str(),
+ &api_url,
+ &api_key,
+ request,
+ );
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -248,7 +244,6 @@ impl OpenAiCompatibleLanguageModel {
fn stream_response(
&self,
request: ResponseRequest,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<'static, Result<futures::stream::BoxStream<'static, Result<ResponsesStreamEvent>>>>
{
@@ -263,23 +258,20 @@ impl OpenAiCompatibleLanguageModel {
});
let provider = self.provider_name.clone();
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey { provider });
- };
- let request = stream_response(
- http_client.as_ref(),
- provider.0.as_str(),
- &api_url,
- &api_key,
- request,
- );
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey { provider });
+ };
+ let request = stream_response(
+ http_client.as_ref(),
+ provider.0.as_str(),
+ &api_url,
+ &api_key,
+ request,
+ );
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -378,7 +370,6 @@ impl LanguageModel for OpenAiCompatibleLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
if self.model.capabilities.chat_completions {
let request = into_open_ai(
request,
@@ -388,7 +379,7 @@ impl LanguageModel for OpenAiCompatibleLanguageModel {
self.max_output_tokens(),
None,
);
- let completions = self.stream_completion(request, bypass_rate_limit, cx);
+ let completions = self.stream_completion(request, cx);
async move {
let mapper = OpenAiEventMapper::new();
Ok(mapper.map_stream(completions.await?).boxed())
@@ -403,7 +394,7 @@ impl LanguageModel for OpenAiCompatibleLanguageModel {
self.max_output_tokens(),
None,
);
- let completions = self.stream_response(request, bypass_rate_limit, cx);
+ let completions = self.stream_response(request, cx);
async move {
let mapper = OpenAiResponseEventMapper::new();
Ok(mapper.map_stream(completions.await?).boxed())
@@ -368,16 +368,12 @@ impl LanguageModel for OpenRouterLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let openrouter_request = into_open_router(request, &self.model, self.max_output_tokens());
let request = self.stream_completion(openrouter_request, cx);
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let response = request.await?;
- Ok(OpenRouterEventMapper::new().map_stream(response))
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let response = request.await?;
+ Ok(OpenRouterEventMapper::new().map_stream(response))
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
}
@@ -193,7 +193,6 @@ impl VercelLanguageModel {
fn stream_completion(
&self,
request: open_ai::Request,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<'static, Result<futures::stream::BoxStream<'static, Result<ResponseStreamEvent>>>>
{
@@ -204,24 +203,21 @@ impl VercelLanguageModel {
(state.api_key_state.key(&api_url), api_url)
});
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let provider = PROVIDER_NAME;
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey { provider });
- };
- let request = open_ai::stream_completion(
- http_client.as_ref(),
- provider.0.as_str(),
- &api_url,
- &api_key,
- request,
- );
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let provider = PROVIDER_NAME;
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey { provider });
+ };
+ let request = open_ai::stream_completion(
+ http_client.as_ref(),
+ provider.0.as_str(),
+ &api_url,
+ &api_key,
+ request,
+ );
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -294,7 +290,6 @@ impl LanguageModel for VercelLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = crate::provider::open_ai::into_open_ai(
request,
self.model.id(),
@@ -303,7 +298,7 @@ impl LanguageModel for VercelLanguageModel {
self.max_output_tokens(),
None,
);
- let completions = self.stream_completion(request, bypass_rate_limit, cx);
+ let completions = self.stream_completion(request, cx);
async move {
let mapper = crate::provider::open_ai::OpenAiEventMapper::new();
Ok(mapper.map_stream(completions.await?).boxed())
@@ -197,7 +197,6 @@ impl XAiLanguageModel {
fn stream_completion(
&self,
request: open_ai::Request,
- bypass_rate_limit: bool,
cx: &AsyncApp,
) -> BoxFuture<
'static,
@@ -213,24 +212,21 @@ impl XAiLanguageModel {
(state.api_key_state.key(&api_url), api_url)
});
- let future = self.request_limiter.stream_with_bypass(
- async move {
- let provider = PROVIDER_NAME;
- let Some(api_key) = api_key else {
- return Err(LanguageModelCompletionError::NoApiKey { provider });
- };
- let request = open_ai::stream_completion(
- http_client.as_ref(),
- provider.0.as_str(),
- &api_url,
- &api_key,
- request,
- );
- let response = request.await?;
- Ok(response)
- },
- bypass_rate_limit,
- );
+ let future = self.request_limiter.stream(async move {
+ let provider = PROVIDER_NAME;
+ let Some(api_key) = api_key else {
+ return Err(LanguageModelCompletionError::NoApiKey { provider });
+ };
+ let request = open_ai::stream_completion(
+ http_client.as_ref(),
+ provider.0.as_str(),
+ &api_url,
+ &api_key,
+ request,
+ );
+ let response = request.await?;
+ Ok(response)
+ });
async move { Ok(future.await?.boxed()) }.boxed()
}
@@ -311,7 +307,6 @@ impl LanguageModel for XAiLanguageModel {
LanguageModelCompletionError,
>,
> {
- let bypass_rate_limit = request.bypass_rate_limit;
let request = crate::provider::open_ai::into_open_ai(
request,
self.model.id(),
@@ -320,7 +315,7 @@ impl LanguageModel for XAiLanguageModel {
self.max_output_tokens(),
None,
);
- let completions = self.stream_completion(request, bypass_rate_limit, cx);
+ let completions = self.stream_completion(request, cx);
async move {
let mapper = crate::provider::open_ai::OpenAiEventMapper::new();
Ok(mapper.map_stream(completions.await?).boxed())
@@ -1100,7 +1100,6 @@ impl RulesLibrary {
stop: Vec::new(),
temperature: None,
thinking_allowed: true,
- bypass_rate_limit: false,
},
cx,
)