Agent Orchestration Patterns for Production: Building Reliable AI Systems at Scale
From Prototype to Production
Building a demo agent is easy. Deploying agents that handle production traffic reliably, cost-effectively, and observably at scale? That’s an entirely different challenge.
This guide shares battle-tested orchestration patterns from running AI agents in production. We’ll cover the architecture decisions, failure modes, and solutions that only become apparent when real users depend on your system.
The Production Reality Check
What Changes in Production?
Prototype Assumptions:
- APIs always respond quickly
- LLMs never timeout
- Users provide clean input
- One request at a time
- Unlimited budget
- Simple debugging via print statements
Production Reality:
- APIs fail, timeout, rate-limit
- LLMs hallucinate, refuse, get slow
- Users send malformed, adversarial input
- Thousands of concurrent requests
- Every token costs money
- Complex distributed debugging needed
Pattern 1: Circuit Breaker for Agent Calls
Prevent cascading failures when agents or external services degrade.
from datetime import datetime, timedelta
from enum import Enum
import asyncio
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Blocking calls
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""
Circuit breaker for agent calls
Prevents cascading failures from unreliable services
"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
recovery_timeout: int = 30
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker protection"""
# Check if circuit is open
if self.state == CircuitState.OPEN:
if self._should_attempt_recovery():
self.state = CircuitState.HALF_OPEN
print("Circuit breaker: Attempting recovery")
else:
raise Exception("Circuit breaker is OPEN")
try:
# Execute with timeout
result = await asyncio.wait_for(
func(*args, **kwargs),
timeout=self.timeout
)
# Success - reset failure count
if self.state == CircuitState.HALF_OPEN:
print("Circuit breaker: Recovery successful, closing circuit")
self._reset()
return result
except Exception as e:
# Failure - record it
self._record_failure()
raise e
def _record_failure(self):
"""Record a failure and potentially open circuit"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit breaker: OPENED after {self.failure_count} failures")
def _should_attempt_recovery(self) -> bool:
"""Check if enough time has passed to attempt recovery"""
if self.last_failure_time is None:
return True
time_since_failure = (datetime.now() - self.last_failure_time).seconds
return time_since_failure >= self.recovery_timeout
def _reset(self):
"""Reset circuit breaker to closed state"""
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
# Usage with agents
class ResilientAgent:
def __init__(self):
self.llm_breaker = CircuitBreaker(
failure_threshold=3,
timeout=30,
recovery_timeout=60
)
self.api_breaker = CircuitBreaker(
failure_threshold=5,
timeout=10,
recovery_timeout=30
)
async def call_llm(self, prompt: str) -> str:
"""Call LLM with circuit breaker protection"""
async def _call():
# Your LLM call here
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", timeout=25)
return await llm.ainvoke(prompt)
try:
return await self.llm_breaker.call(_call)
except Exception as e:
# Fallback strategy
print(f"LLM call failed: {e}")
return "Service temporarily unavailable. Please try again."
async def call_external_api(self, endpoint: str) -> dict:
"""Call external API with circuit breaker protection"""
async def _call():
# Your API call here
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(endpoint, timeout=8)
return response.json()
try:
return await self.api_breaker.call(_call)
except Exception as e:
print(f"API call failed: {e}")
return {"error": "API unavailable", "fallback": True}
# Example usage
async def main():
agent = ResilientAgent()
# Multiple requests - circuit will open if failures exceed threshold
for i in range(10):
try:
result = await agent.call_llm(f"Request {i}")
print(f"Success: {result[:50]}...")
except Exception as e:
print(f"Request {i} failed: {e}")
await asyncio.sleep(1)
# asyncio.run(main())
Pattern 2: Request Queue with Priority
Manage load and ensure important requests are processed first.
import heapq
from dataclasses import dataclass, field
from typing import Any
import asyncio
from datetime import datetime
@dataclass(order=True)
class PrioritizedRequest:
"""Request with priority"""
priority: int
timestamp: datetime = field(compare=False)
request_id: str = field(compare=False)
payload: Any = field(compare=False)
future: asyncio.Future = field(compare=False, default=None)
class AgentRequestQueue:
"""
Priority queue for agent requests
Ensures high-priority requests are processed first
"""
def __init__(self, max_concurrent: int = 10):
self.queue = []
self.max_concurrent = max_concurrent
self.active_requests = 0
self.lock = asyncio.Lock()
self.semaphore = asyncio.Semaphore(max_concurrent)
async def submit(
self,
payload: Any,
priority: int = 5,
request_id: str = None
) -> Any:
"""
Submit request to queue
Priority: 1 (highest) to 10 (lowest)
"""
if request_id is None:
request_id = f"req_{datetime.now().timestamp()}"
# Create prioritized request
request = PrioritizedRequest(
priority=priority,
timestamp=datetime.now(),
request_id=request_id,
payload=payload,
future=asyncio.Future()
)
# Add to queue
async with self.lock:
heapq.heappush(self.queue, request)
# Process queue
asyncio.create_task(self._process_queue())
# Wait for result
return await request.future
async def _process_queue(self):
"""Process requests from queue"""
# Only one processor at a time
if not await self.semaphore.acquire():
return
try:
async with self.lock:
if not self.queue:
return
# Get highest priority request
request = heapq.heappop(self.queue)
# Process request
try:
result = await self._execute_request(request)
request.future.set_result(result)
except Exception as e:
request.future.set_exception(e)
finally:
self.semaphore.release()
# Process next request if available
if self.queue:
asyncio.create_task(self._process_queue())
async def _execute_request(self, request: PrioritizedRequest) -> Any:
"""Execute agent request"""
print(f"Processing {request.request_id} (priority {request.priority})")
# Simulate agent execution
await asyncio.sleep(2) # Replace with actual agent call
return {
"request_id": request.request_id,
"result": f"Processed: {request.payload}"
}
# Usage
async def main():
queue = AgentRequestQueue(max_concurrent=3)
# Submit requests with different priorities
tasks = [
queue.submit("Critical operation", priority=1, request_id="critical-1"),
queue.submit("Normal operation", priority=5, request_id="normal-1"),
queue.submit("Low priority task", priority=9, request_id="low-1"),
queue.submit("Another critical", priority=1, request_id="critical-2"),
queue.submit("Normal task", priority=5, request_id="normal-2"),
]
# Wait for all results
results = await asyncio.gather(*tasks)
for result in results:
print(f"Completed: {result}")
# asyncio.run(main())
Pattern 3: Comprehensive Observability
Monitor agents in production with metrics, logging, and tracing.
from dataclasses import dataclass
from typing import Dict, List, Optional
import time
import json
from datetime import datetime
@dataclass
class AgentMetrics:
"""Metrics for agent execution"""
agent_id: str
request_id: str
start_time: float
end_time: float
duration_ms: float
tokens_used: int
cost_usd: float
success: bool
error: Optional[str] = None
tool_calls: List[str] = None
class AgentObservability:
"""
Comprehensive observability for agents
"""
def __init__(self):
self.metrics: List[AgentMetrics] = []
self.active_requests: Dict[str, float] = {}
def start_request(self, agent_id: str, request_id: str):
"""Track request start"""
self.active_requests[request_id] = time.time()
self._log({
"event": "request_start",
"agent_id": agent_id,
"request_id": request_id,
"timestamp": datetime.now().isoformat()
})
def end_request(
self,
agent_id: str,
request_id: str,
tokens_used: int,
cost_usd: float,
success: bool,
tool_calls: List[str] = None,
error: str = None
):
"""Track request completion"""
start_time = self.active_requests.pop(request_id, time.time())
end_time = time.time()
duration_ms = (end_time - start_time) * 1000
metrics = AgentMetrics(
agent_id=agent_id,
request_id=request_id,
start_time=start_time,
end_time=end_time,
duration_ms=duration_ms,
tokens_used=tokens_used,
cost_usd=cost_usd,
success=success,
error=error,
tool_calls=tool_calls or []
)
self.metrics.append(metrics)
self._log({
"event": "request_end",
"agent_id": agent_id,
"request_id": request_id,
"duration_ms": duration_ms,
"tokens_used": tokens_used,
"cost_usd": cost_usd,
"success": success,
"error": error,
"timestamp": datetime.now().isoformat()
})
# Alert on errors
if not success:
self._alert(f"Agent {agent_id} failed: {error}")
# Alert on high costs
if cost_usd > 1.0:
self._alert(f"High cost request: ${cost_usd:.2f}")
# Alert on slow requests
if duration_ms > 30000:
self._alert(f"Slow request: {duration_ms:.0f}ms")
def get_statistics(self) -> dict:
"""Calculate aggregate statistics"""
if not self.metrics:
return {}
total_requests = len(self.metrics)
successful = sum(1 for m in self.metrics if m.success)
failed = total_requests - successful
total_duration = sum(m.duration_ms for m in self.metrics)
total_tokens = sum(m.tokens_used for m in self.metrics)
total_cost = sum(m.cost_usd for m in self.metrics)
return {
"total_requests": total_requests,
"successful": successful,
"failed": failed,
"success_rate": successful / total_requests if total_requests > 0 else 0,
"avg_duration_ms": total_duration / total_requests if total_requests > 0 else 0,
"total_tokens": total_tokens,
"total_cost_usd": total_cost,
"avg_cost_per_request": total_cost / total_requests if total_requests > 0 else 0
}
def _log(self, data: dict):
"""Log structured data"""
# In production, send to logging service (DataDog, CloudWatch, etc.)
print(json.dumps(data))
def _alert(self, message: str):
"""Send alert"""
# In production, send to alerting service (PagerDuty, Slack, etc.)
print(f"⚠️ ALERT: {message}")
# Usage with agents
class ObservableAgent:
def __init__(self, agent_id: str, observability: AgentObservability):
self.agent_id = agent_id
self.obs = observability
async def execute(self, request_id: str, query: str) -> str:
"""Execute agent with observability"""
self.obs.start_request(self.agent_id, request_id)
tokens_used = 0
cost_usd = 0.0
tool_calls = []
error = None
success = True
try:
# Your agent execution here
from langchain_openai import ChatOpenAI
from langchain.callbacks import get_openai_callback
llm = ChatOpenAI(model="gpt-4")
with get_openai_callback() as cb:
result = await llm.ainvoke(query)
tokens_used = cb.total_tokens
cost_usd = cb.total_cost
return result.content
except Exception as e:
success = False
error = str(e)
raise e
finally:
self.obs.end_request(
agent_id=self.agent_id,
request_id=request_id,
tokens_used=tokens_used,
cost_usd=cost_usd,
success=success,
tool_calls=tool_calls,
error=error
)
# Example
async def main():
obs = AgentObservability()
agent = ObservableAgent("research-agent-1", obs)
# Execute requests
for i in range(5):
try:
result = await agent.execute(f"req-{i}", f"Query {i}")
print(f"Result {i}: {result[:50]}...")
except Exception as e:
print(f"Request {i} failed: {e}")
# View statistics
stats = obs.get_statistics()
print("\nStatistics:")
print(json.dumps(stats, indent=2))
# asyncio.run(main())
Pattern 4: Cost Control and Budget Management
Prevent runaway costs in production.
from datetime import datetime, timedelta
from typing import Optional
class CostGuard:
"""
Enforce cost limits for agent execution
"""
def __init__(
self,
hourly_limit: float = 10.0,
daily_limit: float = 100.0,
per_request_limit: float = 1.0
):
self.hourly_limit = hourly_limit
self.daily_limit = daily_limit
self.per_request_limit = per_request_limit
self.hourly_spend = 0.0
self.daily_spend = 0.0
self.hour_reset = datetime.now() + timedelta(hours=1)
self.day_reset = datetime.now() + timedelta(days=1)
def check_budget(self, estimated_cost: float) -> tuple[bool, Optional[str]]:
"""
Check if request can proceed within budget
Returns (can_proceed, reason_if_blocked)
"""
self._reset_counters()
# Check per-request limit
if estimated_cost > self.per_request_limit:
return False, f"Request cost ${estimated_cost:.2f} exceeds per-request limit ${self.per_request_limit:.2f}"
# Check hourly limit
if self.hourly_spend + estimated_cost > self.hourly_limit:
return False, f"Hourly budget ${self.hourly_limit:.2f} exceeded (current: ${self.hourly_spend:.2f})"
# Check daily limit
if self.daily_spend + estimated_cost > self.daily_limit:
return False, f"Daily budget ${self.daily_limit:.2f} exceeded (current: ${self.daily_spend:.2f})"
return True, None
def record_spend(self, cost: float):
"""Record actual spend"""
self._reset_counters()
self.hourly_spend += cost
self.daily_spend += cost
def _reset_counters(self):
"""Reset counters if time windows have passed"""
now = datetime.now()
if now >= self.hour_reset:
self.hourly_spend = 0.0
self.hour_reset = now + timedelta(hours=1)
if now >= self.day_reset:
self.daily_spend = 0.0
self.day_reset = now + timedelta(days=1)
def get_remaining_budget(self) -> dict:
"""Get remaining budget in each category"""
self._reset_counters()
return {
"hourly_remaining": self.hourly_limit - self.hourly_spend,
"daily_remaining": self.daily_limit - self.daily_spend,
"per_request_limit": self.per_request_limit
}
# Usage
class BudgetControlledAgent:
def __init__(self, cost_guard: CostGuard):
self.cost_guard = cost_guard
async def execute(self, query: str) -> str:
"""Execute with cost controls"""
# Estimate cost (rough approximation)
estimated_tokens = len(query.split()) * 1.3 # Rough estimate
estimated_cost = (estimated_tokens / 1000) * 0.03 # GPT-4 pricing
# Check budget
can_proceed, reason = self.cost_guard.check_budget(estimated_cost)
if not can_proceed:
raise Exception(f"Budget limit reached: {reason}")
# Execute
try:
from langchain_openai import ChatOpenAI
from langchain.callbacks import get_openai_callback
llm = ChatOpenAI(model="gpt-4")
with get_openai_callback() as cb:
result = await llm.ainvoke(query)
actual_cost = cb.total_cost
# Record actual spend
self.cost_guard.record_spend(actual_cost)
return result.content
except Exception as e:
raise e
# Example
async def main():
cost_guard = CostGuard(
hourly_limit=5.0,
daily_limit=50.0,
per_request_limit=0.50
)
agent = BudgetControlledAgent(cost_guard)
# Execute until budget exhausted
for i in range(100):
try:
result = await agent.execute(f"Query {i}" * 100) # Large query
print(f"Request {i} succeeded")
remaining = cost_guard.get_remaining_budget()
print(f"Remaining budget: ${remaining['hourly_remaining']:.2f}/hr, ${remaining['daily_remaining']:.2f}/day")
except Exception as e:
print(f"Request {i} blocked: {e}")
break
# asyncio.run(main())
Pattern 5: Graceful Degradation
Provide fallback behavior when systems fail.
from enum import Enum
from typing import Callable, Optional
class ServiceTier(Enum):
"""Service quality tiers"""
PREMIUM = "premium" # Full AI capabilities
STANDARD = "standard" # Cached + limited AI
BASIC = "basic" # Cached only
EMERGENCY = "emergency" # Static fallback
class GracefulDegradation:
"""
Provide degraded service when full service unavailable
"""
def __init__(self):
self.current_tier = ServiceTier.PREMIUM
self.cache = {}
async def execute_with_fallback(
self,
request_id: str,
query: str,
premium_func: Callable,
standard_func: Optional[Callable] = None,
basic_func: Optional[Callable] = None
) -> tuple[str, ServiceTier]:
"""
Execute with automatic fallback
Returns (result, tier_used)
"""
# Try premium (full AI)
if self.current_tier == ServiceTier.PREMIUM:
try:
result = await premium_func(query)
self.cache[query] = result # Cache for future fallbacks
return result, ServiceTier.PREMIUM
except Exception as e:
print(f"Premium service failed: {e}, falling back to standard")
self.current_tier = ServiceTier.STANDARD
# Try standard (limited AI)
if self.current_tier == ServiceTier.STANDARD and standard_func:
try:
result = await standard_func(query)
return result, ServiceTier.STANDARD
except Exception as e:
print(f"Standard service failed: {e}, falling back to basic")
self.current_tier = ServiceTier.BASIC
# Try basic (cache only)
if self.current_tier == ServiceTier.BASIC:
if query in self.cache:
return self.cache[query], ServiceTier.BASIC
elif basic_func:
try:
result = await basic_func(query)
return result, ServiceTier.BASIC
except Exception as e:
print(f"Basic service failed: {e}, using emergency fallback")
self.current_tier = ServiceTier.EMERGENCY
# Emergency fallback
return self._emergency_response(query), ServiceTier.EMERGENCY
def _emergency_response(self, query: str) -> str:
"""Provide minimal emergency response"""
return (
"We're experiencing technical difficulties. "
"Please try again in a few minutes, or contact support at support@example.com"
)
def recover_service_tier(self):
"""Attempt to recover to higher service tier"""
if self.current_tier != ServiceTier.PREMIUM:
print(f"Attempting to recover from {self.current_tier.value} to premium")
self.current_tier = ServiceTier.PREMIUM
# Usage
class ResilientAgentSystem:
def __init__(self):
self.degradation = GracefulDegradation()
async def premium_service(self, query: str) -> str:
"""Full AI service with all capabilities"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", timeout=30)
result = await llm.ainvoke(query)
return result.content
async def standard_service(self, query: str) -> str:
"""Limited AI service (faster/cheaper model)"""
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-3.5-turbo", timeout=15)
result = await llm.ainvoke(query)
return result.content
async def basic_service(self, query: str) -> str:
"""Rule-based service for common queries"""
# Simple pattern matching for common queries
if "hello" in query.lower():
return "Hello! How can I help you today?"
elif "status" in query.lower():
return "All systems operational."
else:
raise Exception("No basic handler for this query")
async def handle_request(self, request_id: str, query: str) -> dict:
"""Handle request with graceful degradation"""
result, tier = await self.degradation.execute_with_fallback(
request_id=request_id,
query=query,
premium_func=self.premium_service,
standard_func=self.standard_service,
basic_func=self.basic_service
)
return {
"request_id": request_id,
"result": result,
"service_tier": tier.value,
"degraded": tier != ServiceTier.PREMIUM
}
# Example
async def main():
system = ResilientAgentSystem()
# Simulate various service conditions
requests = [
"What is machine learning?",
"Hello",
"What's the status?",
"Complex query requiring AI"
]
for i, query in enumerate(requests):
response = await system.handle_request(f"req-{i}", query)
print(f"\nRequest {i}:")
print(f" Query: {query}")
print(f" Tier: {response['service_tier']}")
print(f" Result: {response['result'][:100]}...")
# Periodically attempt recovery
if i % 3 == 0:
system.degradation.recover_service_tier()
# asyncio.run(main())
Pattern 6: Distributed Tracing
Track requests across multiple agents and services.
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict
@dataclass
class Span:
"""Represents a unit of work in distributed trace"""
span_id: str
parent_span_id: Optional[str]
trace_id: str
operation: str
start_time: datetime
end_time: Optional[datetime] = None
duration_ms: Optional[float] = None
tags: Dict[str, str] = field(default_factory=dict)
logs: List[dict] = field(default_factory=list)
error: Optional[str] = None
class DistributedTracer:
"""
Distributed tracing for multi-agent systems
"""
def __init__(self):
self.traces: Dict[str, List[Span]] = {}
def start_trace(self, operation: str) -> tuple[str, str]:
"""
Start new trace
Returns (trace_id, span_id)
"""
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
span = Span(
span_id=span_id,
parent_span_id=None,
trace_id=trace_id,
operation=operation,
start_time=datetime.now()
)
if trace_id not in self.traces:
self.traces[trace_id] = []
self.traces[trace_id].append(span)
return trace_id, span_id
def start_span(
self,
trace_id: str,
parent_span_id: str,
operation: str,
tags: dict = None
) -> str:
"""Start child span"""
span_id = str(uuid.uuid4())
span = Span(
span_id=span_id,
parent_span_id=parent_span_id,
trace_id=trace_id,
operation=operation,
start_time=datetime.now(),
tags=tags or {}
)
self.traces[trace_id].append(span)
return span_id
def end_span(
self,
trace_id: str,
span_id: str,
error: str = None,
tags: dict = None
):
"""End span and record duration"""
for span in self.traces[trace_id]:
if span.span_id == span_id:
span.end_time = datetime.now()
span.duration_ms = (
(span.end_time - span.start_time).total_seconds() * 1000
)
span.error = error
if tags:
span.tags.update(tags)
break
def log_event(
self,
trace_id: str,
span_id: str,
message: str,
level: str = "info"
):
"""Add log to span"""
for span in self.traces[trace_id]:
if span.span_id == span_id:
span.logs.append({
"timestamp": datetime.now().isoformat(),
"level": level,
"message": message
})
break
def get_trace(self, trace_id: str) -> List[Span]:
"""Get all spans for trace"""
return self.traces.get(trace_id, [])
def visualize_trace(self, trace_id: str) -> str:
"""Simple text visualization of trace"""
spans = self.get_trace(trace_id)
if not spans:
return "Trace not found"
# Build tree structure
root_spans = [s for s in spans if s.parent_span_id is None]
def print_span(span: Span, indent: int = 0) -> List[str]:
lines = []
prefix = " " * indent
duration = f"{span.duration_ms:.2f}ms" if span.duration_ms else "running"
error_marker = " ❌" if span.error else ""
lines.append(f"{prefix}├─ {span.operation} ({duration}){error_marker}")
if span.tags:
lines.append(f"{prefix} Tags: {span.tags}")
if span.logs:
for log in span.logs:
lines.append(f"{prefix} Log: {log['message']}")
# Find children
children = [s for s in spans if s.parent_span_id == span.span_id]
for child in children:
lines.extend(print_span(child, indent + 1))
return lines
output = [f"Trace: {trace_id}\n"]
for root in root_spans:
output.extend(print_span(root))
return "\n".join(output)
# Usage with multi-agent system
class TracedMultiAgentSystem:
def __init__(self, tracer: DistributedTracer):
self.tracer = tracer
async def handle_request(self, user_query: str) -> str:
"""Handle request with full tracing"""
# Start trace
trace_id, root_span_id = self.tracer.start_trace("user_request")
try:
# Step 1: Route request
route_span_id = self.tracer.start_span(
trace_id, root_span_id, "route_request",
tags={"query": user_query[:50]}
)
agent_type = await self._route_request(trace_id, route_span_id, user_query)
self.tracer.end_span(trace_id, route_span_id, tags={"agent": agent_type})
# Step 2: Execute with appropriate agent
execute_span_id = self.tracer.start_span(
trace_id, root_span_id, f"execute_{agent_type}",
tags={"agent_type": agent_type}
)
result = await self._execute_agent(
trace_id, execute_span_id, agent_type, user_query
)
self.tracer.end_span(trace_id, execute_span_id)
# Step 3: Post-process
process_span_id = self.tracer.start_span(
trace_id, root_span_id, "post_process"
)
final_result = await self._post_process(
trace_id, process_span_id, result
)
self.tracer.end_span(trace_id, process_span_id)
# End root span
self.tracer.end_span(trace_id, root_span_id)
return final_result
except Exception as e:
self.tracer.log_event(
trace_id, root_span_id,
f"Error: {str(e)}",
level="error"
)
self.tracer.end_span(trace_id, root_span_id, error=str(e))
raise e
async def _route_request(
self, trace_id: str, span_id: str, query: str
) -> str:
"""Route to appropriate agent"""
self.tracer.log_event(trace_id, span_id, "Analyzing query for routing")
# Simulate routing logic
import asyncio
await asyncio.sleep(0.1)
if "code" in query.lower():
agent = "coder"
elif "research" in query.lower():
agent = "researcher"
else:
agent = "general"
self.tracer.log_event(trace_id, span_id, f"Routed to {agent}")
return agent
async def _execute_agent(
self, trace_id: str, span_id: str, agent_type: str, query: str
) -> str:
"""Execute specific agent"""
# Sub-span for LLM call
llm_span_id = self.tracer.start_span(
trace_id, span_id, "llm_call",
tags={"model": "gpt-4"}
)
self.tracer.log_event(trace_id, llm_span_id, "Calling LLM")
# Simulate LLM call
import asyncio
await asyncio.sleep(0.5)
self.tracer.log_event(trace_id, llm_span_id, "LLM response received")
self.tracer.end_span(trace_id, llm_span_id, tags={"tokens": "1250"})
return f"Result from {agent_type} agent"
async def _post_process(
self, trace_id: str, span_id: str, result: str
) -> str:
"""Post-process result"""
self.tracer.log_event(trace_id, span_id, "Post-processing result")
import asyncio
await asyncio.sleep(0.1)
return f"Processed: {result}"
# Example
async def main():
tracer = DistributedTracer()
system = TracedMultiAgentSystem(tracer)
# Handle request
result = await system.handle_request("Help me write code for sorting")
# Visualize trace
traces = list(tracer.traces.keys())
if traces:
print(tracer.visualize_trace(traces[0]))
# asyncio.run(main())
Deployment Patterns
Container Orchestration (Kubernetes)
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-orchestrator
spec:
replicas: 3
selector:
matchLabels:
app: agent-orchestrator
template:
metadata:
labels:
app: agent-orchestrator
spec:
containers:
- name: orchestrator
image: agent-orchestrator:latest
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
env:
- name: MAX_CONCURRENT_AGENTS
value: "10"
- name: COST_LIMIT_HOURLY
value: "10.0"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-orchestrator-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-orchestrator
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Conclusion
Production agent orchestration requires:
- Resilience: Circuit breakers, retries, fallbacks
- Scalability: Request queuing, load balancing, auto-scaling
- Observability: Metrics, logging, tracing
- Cost Control: Budgets, rate limiting, optimization
- Graceful Degradation: Service tiers, caching, emergency responses
The patterns in this guide are battle-tested in production systems handling thousands of requests daily. Start with the basics (circuit breakers, observability) and layer in additional patterns as your system scales.
Remember: The best AI system is one that works reliably when it matters most.
Resources
- OpenTelemetry: Standard for distributed tracing
- Prometheus + Grafana: Metrics and monitoring
- Sentry: Error tracking and alerting
- AWS X-Ray / DataDog APM: Managed tracing solutions
Build systems that can handle production, and you’ll build AI that delivers real value.