Skip to main content

LLM API Rate Limiting and Retry Strategies

Ryan Dahlberg
Ryan Dahlberg
October 22, 2025 12 min read
Share:
LLM API Rate Limiting and Retry Strategies

LLM API Rate Limiting and Retry Strategies

Building production LLM applications means dealing with API failures. Rate limits, network timeouts, and transient errors are inevitable when you’re making thousands of API calls daily. The difference between a fragile proof-of-concept and a robust production system is how you handle these failures.

After processing millions of LLM API calls in production systems, I’ve learned that resilience isn’t optional—it’s fundamental. Let me share the patterns that actually work.

Understanding LLM Rate Limits

Different providers have different constraints:

OpenAI Rate Limits

# OpenAI uses token-per-minute (TPM) and requests-per-minute (RPM)
OPENAI_LIMITS = {
    'gpt-4': {
        'rpm': 500,        # Requests per minute
        'tpm': 30_000,     # Tokens per minute
        'rpd': 10_000      # Requests per day
    },
    'gpt-3.5-turbo': {
        'rpm': 3_500,
        'tpm': 90_000,
        'rpd': 10_000
    }
}

Anthropic Rate Limits

# Anthropic uses tiered pricing with different limits
ANTHROPIC_LIMITS = {
    'claude-3-5-sonnet-20241022': {
        'rpm': 50,         # Tier 1
        'tpm': 40_000,
        'rpd': 5_000
    },
    'claude-3-5-haiku-20241022': {
        'rpm': 50,
        'tpm': 50_000,
        'rpd': 5_000
    }
}

The Real Challenge

The problem isn’t just hitting rate limits—it’s that:

  1. Limits are per-minute windows - One spike can lock you out
  2. Token counting is approximate - You can’t predict exactly
  3. Errors aren’t always rate limits - 500s, timeouts, network issues
  4. Concurrent requests complicate everything - Multiple threads racing

The Foundation: Exponential Backoff

Start with the basics done right:

import time
import random
from typing import Optional, Callable, TypeVar, Any
from functools import wraps

T = TypeVar('T')

class RetryConfig:
    def __init__(
        self,
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter

    def get_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff and jitter"""
        delay = min(
            self.base_delay * (self.exponential_base ** attempt),
            self.max_delay
        )

        if self.jitter:
            # Add random jitter to prevent thundering herd
            delay *= (0.5 + random.random())

        return delay

def retry_with_backoff(
    config: Optional[RetryConfig] = None,
    retryable_exceptions: tuple = (Exception,)
):
    """Decorator for exponential backoff retry logic"""
    if config is None:
        config = RetryConfig()

    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            last_exception = None

            for attempt in range(config.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except retryable_exceptions as e:
                    last_exception = e

                    if attempt == config.max_retries:
                        raise

                    delay = config.get_delay(attempt)
                    print(f"Attempt {attempt + 1} failed: {e}")
                    print(f"Retrying in {delay:.2f}s...")
                    time.sleep(delay)

            raise last_exception

        return wrapper
    return decorator

Usage Example

from anthropic import Anthropic, RateLimitError, APIError

@retry_with_backoff(
    config=RetryConfig(max_retries=5, base_delay=2.0),
    retryable_exceptions=(RateLimitError, APIError)
)
def call_claude(prompt: str) -> str:
    client = Anthropic()
    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.content[0].text

Advanced Pattern: Token Bucket Rate Limiter

Simple backoff isn’t enough. You need proactive rate limiting:

import threading
from time import time
from typing import Optional

class TokenBucket:
    """Thread-safe token bucket rate limiter"""

    def __init__(
        self,
        rate: float,
        capacity: float,
        initial_tokens: Optional[float] = None
    ):
        self.rate = rate  # Tokens per second
        self.capacity = capacity
        self.tokens = initial_tokens if initial_tokens is not None else capacity
        self.last_update = time()
        self.lock = threading.Lock()

    def _refill(self):
        """Refill tokens based on elapsed time"""
        now = time()
        elapsed = now - self.last_update
        self.tokens = min(
            self.capacity,
            self.tokens + (elapsed * self.rate)
        )
        self.last_update = now

    def consume(self, tokens: float = 1.0) -> bool:
        """Try to consume tokens, return True if successful"""
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_for_tokens(self, tokens: float = 1.0, timeout: Optional[float] = None):
        """Block until tokens are available"""
        start_time = time()

        while True:
            if self.consume(tokens):
                return True

            if timeout and (time() - start_time) > timeout:
                return False

            # Calculate wait time
            with self.lock:
                self._refill()
                needed = tokens - self.tokens
                wait_time = needed / self.rate if self.rate > 0 else 1.0

            time.sleep(min(wait_time, 1.0))

Dual Rate Limiter (RPM + TPM)

class DualRateLimiter:
    """Rate limiter for both requests and tokens"""

    def __init__(self, rpm: int, tpm: int):
        # Convert per-minute to per-second
        self.request_limiter = TokenBucket(
            rate=rpm / 60.0,
            capacity=rpm / 60.0
        )
        self.token_limiter = TokenBucket(
            rate=tpm / 60.0,
            capacity=tpm / 60.0
        )

    def acquire(self, estimated_tokens: int, timeout: Optional[float] = None) -> bool:
        """Acquire permission for a request with estimated token count"""
        # First check requests
        if not self.request_limiter.wait_for_tokens(1.0, timeout):
            return False

        # Then check tokens
        if not self.token_limiter.wait_for_tokens(estimated_tokens, timeout):
            # Release the request token since we failed
            with self.request_limiter.lock:
                self.request_limiter.tokens = min(
                    self.request_limiter.capacity,
                    self.request_limiter.tokens + 1.0
                )
            return False

        return True

Production-Ready LLM Client

Combine all patterns into a robust client:

import tiktoken
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any

@dataclass
class LLMResponse:
    content: str
    model: str
    tokens_used: int
    latency_ms: float
    attempt_count: int
    timestamp: datetime

class ResilientLLMClient:
    def __init__(
        self,
        api_key: str,
        model: str = "claude-3-5-sonnet-20241022",
        rpm: int = 50,
        tpm: int = 40_000,
        retry_config: Optional[RetryConfig] = None
    ):
        from anthropic import Anthropic

        self.client = Anthropic(api_key=api_key)
        self.model = model
        self.rate_limiter = DualRateLimiter(rpm=rpm, tpm=tpm)
        self.retry_config = retry_config or RetryConfig()

        # Token counting
        try:
            self.encoder = tiktoken.encoding_for_model("gpt-4")
        except:
            self.encoder = tiktoken.get_encoding("cl100k_base")

    def _estimate_tokens(self, text: str) -> int:
        """Estimate token count (Claude uses similar tokenization to GPT-4)"""
        return len(self.encoder.encode(text))

    def _estimate_request_tokens(
        self,
        messages: list,
        max_tokens: int
    ) -> int:
        """Estimate total tokens for request"""
        # Input tokens
        input_text = " ".join(
            msg.get("content", "") for msg in messages
        )
        input_tokens = self._estimate_tokens(input_text)

        # Output tokens (max_tokens is the limit)
        return input_tokens + max_tokens

    def complete(
        self,
        messages: list,
        max_tokens: int = 1024,
        temperature: float = 1.0,
        timeout: Optional[float] = 60.0
    ) -> LLMResponse:
        """Make an LLM request with rate limiting and retries"""
        start_time = time()

        # Estimate tokens for rate limiting
        estimated_tokens = self._estimate_request_tokens(messages, max_tokens)

        # Acquire rate limit permission
        if not self.rate_limiter.acquire(estimated_tokens, timeout):
            raise TimeoutError(
                f"Could not acquire rate limit permission within {timeout}s"
            )

        # Retry logic
        last_exception = None
        for attempt in range(self.retry_config.max_retries + 1):
            try:
                response = self.client.messages.create(
                    model=self.model,
                    max_tokens=max_tokens,
                    temperature=temperature,
                    messages=messages
                )

                latency_ms = (time() - start_time) * 1000

                return LLMResponse(
                    content=response.content[0].text,
                    model=response.model,
                    tokens_used=response.usage.input_tokens + response.usage.output_tokens,
                    latency_ms=latency_ms,
                    attempt_count=attempt + 1,
                    timestamp=datetime.now()
                )

            except Exception as e:
                last_exception = e

                if attempt == self.retry_config.max_retries:
                    raise

                delay = self.retry_config.get_delay(attempt)
                print(f"Attempt {attempt + 1} failed: {e}")
                print(f"Retrying in {delay:.2f}s...")
                time.sleep(delay)

        raise last_exception

Usage

# Initialize client with rate limits
client = ResilientLLMClient(
    api_key="your-api-key",
    rpm=50,
    tpm=40_000,
    retry_config=RetryConfig(max_retries=5)
)

# Make requests - rate limiting and retries are automatic
response = client.complete(
    messages=[{
        "role": "user",
        "content": "Explain quantum computing"
    }],
    max_tokens=500
)

print(f"Response: {response.content}")
print(f"Tokens used: {response.tokens_used}")
print(f"Latency: {response.latency_ms:.0f}ms")
print(f"Attempts: {response.attempt_count}")

Concurrent Request Management

When making parallel requests, you need coordination:

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Callable

class BatchLLMProcessor:
    def __init__(self, client: ResilientLLMClient, max_workers: int = 5):
        self.client = client
        self.max_workers = max_workers

    def process_batch(
        self,
        requests: List[Dict[str, Any]],
        progress_callback: Optional[Callable] = None
    ) -> List[LLMResponse]:
        """Process multiple requests concurrently with rate limiting"""
        results = []

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all requests
            future_to_request = {
                executor.submit(
                    self.client.complete,
                    **request
                ): i
                for i, request in enumerate(requests)
            }

            # Collect results as they complete
            for future in as_completed(future_to_request):
                request_idx = future_to_request[future]

                try:
                    result = future.result()
                    results.append((request_idx, result))

                    if progress_callback:
                        progress_callback(len(results), len(requests))

                except Exception as e:
                    print(f"Request {request_idx} failed: {e}")
                    results.append((request_idx, None))

        # Sort by original order
        results.sort(key=lambda x: x[0])
        return [r[1] for r in results]

Usage Example

# Prepare batch of requests
requests = [
    {
        "messages": [{"role": "user", "content": f"Summarize topic {i}"}],
        "max_tokens": 200
    }
    for i in range(100)
]

# Process with progress tracking
def progress(completed, total):
    print(f"Progress: {completed}/{total} ({completed/total*100:.1f}%)")

processor = BatchLLMProcessor(client, max_workers=10)
results = processor.process_batch(requests, progress_callback=progress)

# Filter successful responses
successful = [r for r in results if r is not None]
print(f"Completed {len(successful)}/{len(requests)} requests")

Adaptive Rate Limiting

Learn from failures and adjust dynamically:

from collections import deque
from datetime import datetime, timedelta

class AdaptiveRateLimiter:
    """Rate limiter that adapts based on API responses"""

    def __init__(
        self,
        initial_rpm: int,
        initial_tpm: int,
        window_size: int = 100
    ):
        self.target_rpm = initial_rpm
        self.target_tpm = initial_tpm

        # Track recent requests
        self.recent_requests = deque(maxlen=window_size)

        # Current rate limiter
        self.limiter = DualRateLimiter(rpm=initial_rpm, tpm=initial_tpm)

        # Adaptation parameters
        self.success_threshold = 0.95  # 95% success rate to increase
        self.failure_threshold = 0.85  # Below 85% decrease

    def record_request(self, success: bool, was_rate_limited: bool):
        """Record request outcome"""
        self.recent_requests.append({
            'success': success,
            'rate_limited': was_rate_limited,
            'timestamp': datetime.now()
        })

        # Adapt if we have enough data
        if len(self.recent_requests) >= 20:
            self._adapt()

    def _adapt(self):
        """Adjust rate limits based on recent performance"""
        success_rate = sum(
            1 for r in self.recent_requests if r['success']
        ) / len(self.recent_requests)

        rate_limited_rate = sum(
            1 for r in self.recent_requests if r['rate_limited']
        ) / len(self.recent_requests)

        if success_rate > self.success_threshold and rate_limited_rate < 0.05:
            # Performing well, increase limits
            self.target_rpm = int(self.target_rpm * 1.1)
            self.target_tpm = int(self.target_tpm * 1.1)
            print(f"Increasing rate limits: RPM={self.target_rpm}, TPM={self.target_tpm}")
            self._update_limiter()

        elif success_rate < self.failure_threshold or rate_limited_rate > 0.15:
            # Too many failures, decrease limits
            self.target_rpm = int(self.target_rpm * 0.8)
            self.target_tpm = int(self.target_tpm * 0.8)
            print(f"Decreasing rate limits: RPM={self.target_rpm}, TPM={self.target_tpm}")
            self._update_limiter()

    def _update_limiter(self):
        """Create new limiter with updated limits"""
        self.limiter = DualRateLimiter(
            rpm=self.target_rpm,
            tpm=self.target_tpm
        )

    def acquire(self, estimated_tokens: int, timeout: Optional[float] = None) -> bool:
        """Acquire rate limit permission"""
        return self.limiter.acquire(estimated_tokens, timeout)

Circuit Breaker Pattern

Fail fast when the API is consistently down:

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        half_open_attempts: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.half_open_attempts = half_open_attempts

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.half_open_successes = 0

    def call(self, func: Callable, *args, **kwargs):
        """Execute function with circuit breaker protection"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                print("Circuit breaker entering HALF_OPEN state")
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to try again"""
        if not self.last_failure_time:
            return True

        return datetime.now() - self.last_failure_time > timedelta(
            seconds=self.timeout
        )

    def _on_success(self):
        """Handle successful request"""
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_successes += 1
            if self.half_open_successes >= self.half_open_attempts:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.half_open_successes = 0
                print("Circuit breaker CLOSED")
        else:
            self.failure_count = 0

    def _on_failure(self):
        """Handle failed request"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker OPEN after {self.failure_count} failures")

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.half_open_successes = 0
            print("Circuit breaker back to OPEN")

Complete Production System

Combine all patterns:

class ProductionLLMClient:
    """Production-ready LLM client with all resilience patterns"""

    def __init__(
        self,
        api_key: str,
        model: str = "claude-3-5-sonnet-20241022",
        rpm: int = 50,
        tpm: int = 40_000
    ):
        from anthropic import Anthropic

        self.client = Anthropic(api_key=api_key)
        self.model = model

        # Adaptive rate limiting
        self.rate_limiter = AdaptiveRateLimiter(
            initial_rpm=rpm,
            initial_tpm=tpm
        )

        # Circuit breaker
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            timeout=60
        )

        # Retry configuration
        self.retry_config = RetryConfig(max_retries=5)

        # Token estimation
        self.encoder = tiktoken.get_encoding("cl100k_base")

    def complete(
        self,
        messages: list,
        max_tokens: int = 1024,
        **kwargs
    ) -> LLMResponse:
        """Make resilient LLM request"""
        # Estimate tokens
        estimated_tokens = self._estimate_tokens(messages, max_tokens)

        # Acquire rate limit
        if not self.rate_limiter.acquire(estimated_tokens, timeout=60):
            raise TimeoutError("Could not acquire rate limit")

        # Execute with circuit breaker and retry
        def make_request():
            return self.circuit_breaker.call(
                self._make_request,
                messages,
                max_tokens,
                **kwargs
            )

        # Retry with backoff
        for attempt in range(self.retry_config.max_retries + 1):
            try:
                response = make_request()

                # Record success
                self.rate_limiter.record_request(
                    success=True,
                    was_rate_limited=False
                )

                return response

            except Exception as e:
                is_rate_limit = "rate" in str(e).lower()

                # Record failure
                self.rate_limiter.record_request(
                    success=False,
                    was_rate_limited=is_rate_limit
                )

                if attempt == self.retry_config.max_retries:
                    raise

                delay = self.retry_config.get_delay(attempt)
                time.sleep(delay)

    def _make_request(self, messages, max_tokens, **kwargs):
        """Actual API call"""
        start_time = time()

        response = self.client.messages.create(
            model=self.model,
            max_tokens=max_tokens,
            messages=messages,
            **kwargs
        )

        return LLMResponse(
            content=response.content[0].text,
            model=response.model,
            tokens_used=response.usage.input_tokens + response.usage.output_tokens,
            latency_ms=(time() - start_time) * 1000,
            attempt_count=1,
            timestamp=datetime.now()
        )

    def _estimate_tokens(self, messages, max_tokens):
        """Estimate total tokens"""
        text = " ".join(m.get("content", "") for m in messages)
        return len(self.encoder.encode(text)) + max_tokens

Monitoring and Observability

Track what matters:

from dataclasses import dataclass, field
from typing import List

@dataclass
class LLMMetrics:
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    rate_limited_requests: int = 0
    total_tokens: int = 0
    total_latency_ms: float = 0
    retry_counts: List[int] = field(default_factory=list)

    @property
    def success_rate(self) -> float:
        if self.total_requests == 0:
            return 0
        return self.successful_requests / self.total_requests

    @property
    def avg_latency_ms(self) -> float:
        if self.successful_requests == 0:
            return 0
        return self.total_latency_ms / self.successful_requests

    @property
    def avg_retries(self) -> float:
        if not self.retry_counts:
            return 0
        return sum(self.retry_counts) / len(self.retry_counts)

    def record_response(self, response: LLMResponse, success: bool, rate_limited: bool):
        """Record response metrics"""
        self.total_requests += 1

        if success:
            self.successful_requests += 1
            self.total_tokens += response.tokens_used
            self.total_latency_ms += response.latency_ms
            self.retry_counts.append(response.attempt_count - 1)
        else:
            self.failed_requests += 1

        if rate_limited:
            self.rate_limited_requests += 1

    def report(self):
        """Print metrics summary"""
        print(f"\n=== LLM Metrics ===")
        print(f"Total Requests: {self.total_requests}")
        print(f"Success Rate: {self.success_rate:.1%}")
        print(f"Rate Limited: {self.rate_limited_requests} ({self.rate_limited_requests/self.total_requests:.1%})")
        print(f"Total Tokens: {self.total_tokens:,}")
        print(f"Avg Latency: {self.avg_latency_ms:.0f}ms")
        print(f"Avg Retries: {self.avg_retries:.2f}")

Key Takeaways

Production LLM resilience requires:

  1. Exponential backoff - With jitter to prevent thundering herd
  2. Proactive rate limiting - Don’t wait for 429 errors
  3. Token-aware limiting - Track both RPM and TPM
  4. Circuit breakers - Fail fast when APIs are down
  5. Adaptive limits - Learn from failures and adjust
  6. Comprehensive monitoring - Track success rates, latency, retries

The patterns I’ve shared handle millions of API calls in production. Start with basic retry logic, then layer in sophistication as your scale demands it.

Resources


Building AI systems that don’t fall over. One retry at a time.

#LLM Integration #API Design #Rate Limiting #Resilience #Error Handling #Production