LLM API Rate Limiting and Retry Strategies
LLM API Rate Limiting and Retry Strategies
Building production LLM applications means dealing with API failures. Rate limits, network timeouts, and transient errors are inevitable when you’re making thousands of API calls daily. The difference between a fragile proof-of-concept and a robust production system is how you handle these failures.
After processing millions of LLM API calls in production systems, I’ve learned that resilience isn’t optional—it’s fundamental. Let me share the patterns that actually work.
Understanding LLM Rate Limits
Different providers have different constraints:
OpenAI Rate Limits
# OpenAI uses token-per-minute (TPM) and requests-per-minute (RPM)
OPENAI_LIMITS = {
'gpt-4': {
'rpm': 500, # Requests per minute
'tpm': 30_000, # Tokens per minute
'rpd': 10_000 # Requests per day
},
'gpt-3.5-turbo': {
'rpm': 3_500,
'tpm': 90_000,
'rpd': 10_000
}
}
Anthropic Rate Limits
# Anthropic uses tiered pricing with different limits
ANTHROPIC_LIMITS = {
'claude-3-5-sonnet-20241022': {
'rpm': 50, # Tier 1
'tpm': 40_000,
'rpd': 5_000
},
'claude-3-5-haiku-20241022': {
'rpm': 50,
'tpm': 50_000,
'rpd': 5_000
}
}
The Real Challenge
The problem isn’t just hitting rate limits—it’s that:
- Limits are per-minute windows - One spike can lock you out
- Token counting is approximate - You can’t predict exactly
- Errors aren’t always rate limits - 500s, timeouts, network issues
- Concurrent requests complicate everything - Multiple threads racing
The Foundation: Exponential Backoff
Start with the basics done right:
import time
import random
from typing import Optional, Callable, TypeVar, Any
from functools import wraps
T = TypeVar('T')
class RetryConfig:
def __init__(
self,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and jitter"""
delay = min(
self.base_delay * (self.exponential_base ** attempt),
self.max_delay
)
if self.jitter:
# Add random jitter to prevent thundering herd
delay *= (0.5 + random.random())
return delay
def retry_with_backoff(
config: Optional[RetryConfig] = None,
retryable_exceptions: tuple = (Exception,)
):
"""Decorator for exponential backoff retry logic"""
if config is None:
config = RetryConfig()
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return func(*args, **kwargs)
except retryable_exceptions as e:
last_exception = e
if attempt == config.max_retries:
raise
delay = config.get_delay(attempt)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay:.2f}s...")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
Usage Example
from anthropic import Anthropic, RateLimitError, APIError
@retry_with_backoff(
config=RetryConfig(max_retries=5, base_delay=2.0),
retryable_exceptions=(RateLimitError, APIError)
)
def call_claude(prompt: str) -> str:
client = Anthropic()
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
Advanced Pattern: Token Bucket Rate Limiter
Simple backoff isn’t enough. You need proactive rate limiting:
import threading
from time import time
from typing import Optional
class TokenBucket:
"""Thread-safe token bucket rate limiter"""
def __init__(
self,
rate: float,
capacity: float,
initial_tokens: Optional[float] = None
):
self.rate = rate # Tokens per second
self.capacity = capacity
self.tokens = initial_tokens if initial_tokens is not None else capacity
self.last_update = time()
self.lock = threading.Lock()
def _refill(self):
"""Refill tokens based on elapsed time"""
now = time()
elapsed = now - self.last_update
self.tokens = min(
self.capacity,
self.tokens + (elapsed * self.rate)
)
self.last_update = now
def consume(self, tokens: float = 1.0) -> bool:
"""Try to consume tokens, return True if successful"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_for_tokens(self, tokens: float = 1.0, timeout: Optional[float] = None):
"""Block until tokens are available"""
start_time = time()
while True:
if self.consume(tokens):
return True
if timeout and (time() - start_time) > timeout:
return False
# Calculate wait time
with self.lock:
self._refill()
needed = tokens - self.tokens
wait_time = needed / self.rate if self.rate > 0 else 1.0
time.sleep(min(wait_time, 1.0))
Dual Rate Limiter (RPM + TPM)
class DualRateLimiter:
"""Rate limiter for both requests and tokens"""
def __init__(self, rpm: int, tpm: int):
# Convert per-minute to per-second
self.request_limiter = TokenBucket(
rate=rpm / 60.0,
capacity=rpm / 60.0
)
self.token_limiter = TokenBucket(
rate=tpm / 60.0,
capacity=tpm / 60.0
)
def acquire(self, estimated_tokens: int, timeout: Optional[float] = None) -> bool:
"""Acquire permission for a request with estimated token count"""
# First check requests
if not self.request_limiter.wait_for_tokens(1.0, timeout):
return False
# Then check tokens
if not self.token_limiter.wait_for_tokens(estimated_tokens, timeout):
# Release the request token since we failed
with self.request_limiter.lock:
self.request_limiter.tokens = min(
self.request_limiter.capacity,
self.request_limiter.tokens + 1.0
)
return False
return True
Production-Ready LLM Client
Combine all patterns into a robust client:
import tiktoken
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
@dataclass
class LLMResponse:
content: str
model: str
tokens_used: int
latency_ms: float
attempt_count: int
timestamp: datetime
class ResilientLLMClient:
def __init__(
self,
api_key: str,
model: str = "claude-3-5-sonnet-20241022",
rpm: int = 50,
tpm: int = 40_000,
retry_config: Optional[RetryConfig] = None
):
from anthropic import Anthropic
self.client = Anthropic(api_key=api_key)
self.model = model
self.rate_limiter = DualRateLimiter(rpm=rpm, tpm=tpm)
self.retry_config = retry_config or RetryConfig()
# Token counting
try:
self.encoder = tiktoken.encoding_for_model("gpt-4")
except:
self.encoder = tiktoken.get_encoding("cl100k_base")
def _estimate_tokens(self, text: str) -> int:
"""Estimate token count (Claude uses similar tokenization to GPT-4)"""
return len(self.encoder.encode(text))
def _estimate_request_tokens(
self,
messages: list,
max_tokens: int
) -> int:
"""Estimate total tokens for request"""
# Input tokens
input_text = " ".join(
msg.get("content", "") for msg in messages
)
input_tokens = self._estimate_tokens(input_text)
# Output tokens (max_tokens is the limit)
return input_tokens + max_tokens
def complete(
self,
messages: list,
max_tokens: int = 1024,
temperature: float = 1.0,
timeout: Optional[float] = 60.0
) -> LLMResponse:
"""Make an LLM request with rate limiting and retries"""
start_time = time()
# Estimate tokens for rate limiting
estimated_tokens = self._estimate_request_tokens(messages, max_tokens)
# Acquire rate limit permission
if not self.rate_limiter.acquire(estimated_tokens, timeout):
raise TimeoutError(
f"Could not acquire rate limit permission within {timeout}s"
)
# Retry logic
last_exception = None
for attempt in range(self.retry_config.max_retries + 1):
try:
response = self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
messages=messages
)
latency_ms = (time() - start_time) * 1000
return LLMResponse(
content=response.content[0].text,
model=response.model,
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
latency_ms=latency_ms,
attempt_count=attempt + 1,
timestamp=datetime.now()
)
except Exception as e:
last_exception = e
if attempt == self.retry_config.max_retries:
raise
delay = self.retry_config.get_delay(attempt)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {delay:.2f}s...")
time.sleep(delay)
raise last_exception
Usage
# Initialize client with rate limits
client = ResilientLLMClient(
api_key="your-api-key",
rpm=50,
tpm=40_000,
retry_config=RetryConfig(max_retries=5)
)
# Make requests - rate limiting and retries are automatic
response = client.complete(
messages=[{
"role": "user",
"content": "Explain quantum computing"
}],
max_tokens=500
)
print(f"Response: {response.content}")
print(f"Tokens used: {response.tokens_used}")
print(f"Latency: {response.latency_ms:.0f}ms")
print(f"Attempts: {response.attempt_count}")
Concurrent Request Management
When making parallel requests, you need coordination:
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Callable
class BatchLLMProcessor:
def __init__(self, client: ResilientLLMClient, max_workers: int = 5):
self.client = client
self.max_workers = max_workers
def process_batch(
self,
requests: List[Dict[str, Any]],
progress_callback: Optional[Callable] = None
) -> List[LLMResponse]:
"""Process multiple requests concurrently with rate limiting"""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all requests
future_to_request = {
executor.submit(
self.client.complete,
**request
): i
for i, request in enumerate(requests)
}
# Collect results as they complete
for future in as_completed(future_to_request):
request_idx = future_to_request[future]
try:
result = future.result()
results.append((request_idx, result))
if progress_callback:
progress_callback(len(results), len(requests))
except Exception as e:
print(f"Request {request_idx} failed: {e}")
results.append((request_idx, None))
# Sort by original order
results.sort(key=lambda x: x[0])
return [r[1] for r in results]
Usage Example
# Prepare batch of requests
requests = [
{
"messages": [{"role": "user", "content": f"Summarize topic {i}"}],
"max_tokens": 200
}
for i in range(100)
]
# Process with progress tracking
def progress(completed, total):
print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")
processor = BatchLLMProcessor(client, max_workers=10)
results = processor.process_batch(requests, progress_callback=progress)
# Filter successful responses
successful = [r for r in results if r is not None]
print(f"Completed {len(successful)}/{len(requests)} requests")
Adaptive Rate Limiting
Learn from failures and adjust dynamically:
from collections import deque
from datetime import datetime, timedelta
class AdaptiveRateLimiter:
"""Rate limiter that adapts based on API responses"""
def __init__(
self,
initial_rpm: int,
initial_tpm: int,
window_size: int = 100
):
self.target_rpm = initial_rpm
self.target_tpm = initial_tpm
# Track recent requests
self.recent_requests = deque(maxlen=window_size)
# Current rate limiter
self.limiter = DualRateLimiter(rpm=initial_rpm, tpm=initial_tpm)
# Adaptation parameters
self.success_threshold = 0.95 # 95% success rate to increase
self.failure_threshold = 0.85 # Below 85% decrease
def record_request(self, success: bool, was_rate_limited: bool):
"""Record request outcome"""
self.recent_requests.append({
'success': success,
'rate_limited': was_rate_limited,
'timestamp': datetime.now()
})
# Adapt if we have enough data
if len(self.recent_requests) >= 20:
self._adapt()
def _adapt(self):
"""Adjust rate limits based on recent performance"""
success_rate = sum(
1 for r in self.recent_requests if r['success']
) / len(self.recent_requests)
rate_limited_rate = sum(
1 for r in self.recent_requests if r['rate_limited']
) / len(self.recent_requests)
if success_rate > self.success_threshold and rate_limited_rate < 0.05:
# Performing well, increase limits
self.target_rpm = int(self.target_rpm * 1.1)
self.target_tpm = int(self.target_tpm * 1.1)
print(f"Increasing rate limits: RPM={self.target_rpm}, TPM={self.target_tpm}")
self._update_limiter()
elif success_rate < self.failure_threshold or rate_limited_rate > 0.15:
# Too many failures, decrease limits
self.target_rpm = int(self.target_rpm * 0.8)
self.target_tpm = int(self.target_tpm * 0.8)
print(f"Decreasing rate limits: RPM={self.target_rpm}, TPM={self.target_tpm}")
self._update_limiter()
def _update_limiter(self):
"""Create new limiter with updated limits"""
self.limiter = DualRateLimiter(
rpm=self.target_rpm,
tpm=self.target_tpm
)
def acquire(self, estimated_tokens: int, timeout: Optional[float] = None) -> bool:
"""Acquire rate limit permission"""
return self.limiter.acquire(estimated_tokens, timeout)
Circuit Breaker Pattern
Fail fast when the API is consistently down:
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
half_open_attempts: int = 3
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.half_open_attempts = half_open_attempts
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_successes = 0
def call(self, func: Callable, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
print("Circuit breaker entering HALF_OPEN state")
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to try again"""
if not self.last_failure_time:
return True
return datetime.now() - self.last_failure_time > timedelta(
seconds=self.timeout
)
def _on_success(self):
"""Handle successful request"""
if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_attempts:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.half_open_successes = 0
print("Circuit breaker CLOSED")
else:
self.failure_count = 0
def _on_failure(self):
"""Handle failed request"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker OPEN after {self.failure_count} failures")
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.half_open_successes = 0
print("Circuit breaker back to OPEN")
Complete Production System
Combine all patterns:
class ProductionLLMClient:
"""Production-ready LLM client with all resilience patterns"""
def __init__(
self,
api_key: str,
model: str = "claude-3-5-sonnet-20241022",
rpm: int = 50,
tpm: int = 40_000
):
from anthropic import Anthropic
self.client = Anthropic(api_key=api_key)
self.model = model
# Adaptive rate limiting
self.rate_limiter = AdaptiveRateLimiter(
initial_rpm=rpm,
initial_tpm=tpm
)
# Circuit breaker
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout=60
)
# Retry configuration
self.retry_config = RetryConfig(max_retries=5)
# Token estimation
self.encoder = tiktoken.get_encoding("cl100k_base")
def complete(
self,
messages: list,
max_tokens: int = 1024,
**kwargs
) -> LLMResponse:
"""Make resilient LLM request"""
# Estimate tokens
estimated_tokens = self._estimate_tokens(messages, max_tokens)
# Acquire rate limit
if not self.rate_limiter.acquire(estimated_tokens, timeout=60):
raise TimeoutError("Could not acquire rate limit")
# Execute with circuit breaker and retry
def make_request():
return self.circuit_breaker.call(
self._make_request,
messages,
max_tokens,
**kwargs
)
# Retry with backoff
for attempt in range(self.retry_config.max_retries + 1):
try:
response = make_request()
# Record success
self.rate_limiter.record_request(
success=True,
was_rate_limited=False
)
return response
except Exception as e:
is_rate_limit = "rate" in str(e).lower()
# Record failure
self.rate_limiter.record_request(
success=False,
was_rate_limited=is_rate_limit
)
if attempt == self.retry_config.max_retries:
raise
delay = self.retry_config.get_delay(attempt)
time.sleep(delay)
def _make_request(self, messages, max_tokens, **kwargs):
"""Actual API call"""
start_time = time()
response = self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
messages=messages,
**kwargs
)
return LLMResponse(
content=response.content[0].text,
model=response.model,
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
latency_ms=(time() - start_time) * 1000,
attempt_count=1,
timestamp=datetime.now()
)
def _estimate_tokens(self, messages, max_tokens):
"""Estimate total tokens"""
text = " ".join(m.get("content", "") for m in messages)
return len(self.encoder.encode(text)) + max_tokens
Monitoring and Observability
Track what matters:
from dataclasses import dataclass, field
from typing import List
@dataclass
class LLMMetrics:
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
rate_limited_requests: int = 0
total_tokens: int = 0
total_latency_ms: float = 0
retry_counts: List[int] = field(default_factory=list)
@property
def success_rate(self) -> float:
if self.total_requests == 0:
return 0
return self.successful_requests / self.total_requests
@property
def avg_latency_ms(self) -> float:
if self.successful_requests == 0:
return 0
return self.total_latency_ms / self.successful_requests
@property
def avg_retries(self) -> float:
if not self.retry_counts:
return 0
return sum(self.retry_counts) / len(self.retry_counts)
def record_response(self, response: LLMResponse, success: bool, rate_limited: bool):
"""Record response metrics"""
self.total_requests += 1
if success:
self.successful_requests += 1
self.total_tokens += response.tokens_used
self.total_latency_ms += response.latency_ms
self.retry_counts.append(response.attempt_count - 1)
else:
self.failed_requests += 1
if rate_limited:
self.rate_limited_requests += 1
def report(self):
"""Print metrics summary"""
print(f"\n=== LLM Metrics ===")
print(f"Total Requests: {self.total_requests}")
print(f"Success Rate: {self.success_rate:.1%}")
print(f"Rate Limited: {self.rate_limited_requests} ({self.rate_limited_requests/self.total_requests:.1%})")
print(f"Total Tokens: {self.total_tokens:,}")
print(f"Avg Latency: {self.avg_latency_ms:.0f}ms")
print(f"Avg Retries: {self.avg_retries:.2f}")
Key Takeaways
Production LLM resilience requires:
- Exponential backoff - With jitter to prevent thundering herd
- Proactive rate limiting - Don’t wait for 429 errors
- Token-aware limiting - Track both RPM and TPM
- Circuit breakers - Fail fast when APIs are down
- Adaptive limits - Learn from failures and adjust
- Comprehensive monitoring - Track success rates, latency, retries
The patterns I’ve shared handle millions of API calls in production. Start with basic retry logic, then layer in sophistication as your scale demands it.
Resources
Building AI systems that don’t fall over. One retry at a time.