Skip to main content

Agent Orchestration Patterns for Production: Building Reliable AI Systems at Scale

Ryan Dahlberg
Ryan Dahlberg
December 16, 2025 16 min read
Share:
Agent Orchestration Patterns for Production: Building Reliable AI Systems at Scale

From Prototype to Production

Building a demo agent is easy. Deploying agents that handle production traffic reliably, cost-effectively, and observably at scale? That’s an entirely different challenge.

This guide shares battle-tested orchestration patterns from running AI agents in production. We’ll cover the architecture decisions, failure modes, and solutions that only become apparent when real users depend on your system.


The Production Reality Check

What Changes in Production?

Prototype Assumptions:

  • APIs always respond quickly
  • LLMs never timeout
  • Users provide clean input
  • One request at a time
  • Unlimited budget
  • Simple debugging via print statements

Production Reality:

  • APIs fail, timeout, rate-limit
  • LLMs hallucinate, refuse, get slow
  • Users send malformed, adversarial input
  • Thousands of concurrent requests
  • Every token costs money
  • Complex distributed debugging needed

Pattern 1: Circuit Breaker for Agent Calls

Prevent cascading failures when agents or external services degrade.

from datetime import datetime, timedelta
from enum import Enum
import asyncio
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Blocking calls
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """
    Circuit breaker for agent calls
    Prevents cascading failures from unreliable services
    """
    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        recovery_timeout: int = 30
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.recovery_timeout = recovery_timeout

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker protection"""

        # Check if circuit is open
        if self.state == CircuitState.OPEN:
            if self._should_attempt_recovery():
                self.state = CircuitState.HALF_OPEN
                print("Circuit breaker: Attempting recovery")
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            # Execute with timeout
            result = await asyncio.wait_for(
                func(*args, **kwargs),
                timeout=self.timeout
            )

            # Success - reset failure count
            if self.state == CircuitState.HALF_OPEN:
                print("Circuit breaker: Recovery successful, closing circuit")
                self._reset()

            return result

        except Exception as e:
            # Failure - record it
            self._record_failure()
            raise e

    def _record_failure(self):
        """Record a failure and potentially open circuit"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"Circuit breaker: OPENED after {self.failure_count} failures")

    def _should_attempt_recovery(self) -> bool:
        """Check if enough time has passed to attempt recovery"""
        if self.last_failure_time is None:
            return True

        time_since_failure = (datetime.now() - self.last_failure_time).seconds
        return time_since_failure >= self.recovery_timeout

    def _reset(self):
        """Reset circuit breaker to closed state"""
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

# Usage with agents
class ResilientAgent:
    def __init__(self):
        self.llm_breaker = CircuitBreaker(
            failure_threshold=3,
            timeout=30,
            recovery_timeout=60
        )
        self.api_breaker = CircuitBreaker(
            failure_threshold=5,
            timeout=10,
            recovery_timeout=30
        )

    async def call_llm(self, prompt: str) -> str:
        """Call LLM with circuit breaker protection"""
        async def _call():
            # Your LLM call here
            from langchain_openai import ChatOpenAI
            llm = ChatOpenAI(model="gpt-4", timeout=25)
            return await llm.ainvoke(prompt)

        try:
            return await self.llm_breaker.call(_call)
        except Exception as e:
            # Fallback strategy
            print(f"LLM call failed: {e}")
            return "Service temporarily unavailable. Please try again."

    async def call_external_api(self, endpoint: str) -> dict:
        """Call external API with circuit breaker protection"""
        async def _call():
            # Your API call here
            import httpx
            async with httpx.AsyncClient() as client:
                response = await client.get(endpoint, timeout=8)
                return response.json()

        try:
            return await self.api_breaker.call(_call)
        except Exception as e:
            print(f"API call failed: {e}")
            return {"error": "API unavailable", "fallback": True}

# Example usage
async def main():
    agent = ResilientAgent()

    # Multiple requests - circuit will open if failures exceed threshold
    for i in range(10):
        try:
            result = await agent.call_llm(f"Request {i}")
            print(f"Success: {result[:50]}...")
        except Exception as e:
            print(f"Request {i} failed: {e}")

        await asyncio.sleep(1)

# asyncio.run(main())

Pattern 2: Request Queue with Priority

Manage load and ensure important requests are processed first.

import heapq
from dataclasses import dataclass, field
from typing import Any
import asyncio
from datetime import datetime

@dataclass(order=True)
class PrioritizedRequest:
    """Request with priority"""
    priority: int
    timestamp: datetime = field(compare=False)
    request_id: str = field(compare=False)
    payload: Any = field(compare=False)
    future: asyncio.Future = field(compare=False, default=None)

class AgentRequestQueue:
    """
    Priority queue for agent requests
    Ensures high-priority requests are processed first
    """
    def __init__(self, max_concurrent: int = 10):
        self.queue = []
        self.max_concurrent = max_concurrent
        self.active_requests = 0
        self.lock = asyncio.Lock()
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def submit(
        self,
        payload: Any,
        priority: int = 5,
        request_id: str = None
    ) -> Any:
        """
        Submit request to queue
        Priority: 1 (highest) to 10 (lowest)
        """
        if request_id is None:
            request_id = f"req_{datetime.now().timestamp()}"

        # Create prioritized request
        request = PrioritizedRequest(
            priority=priority,
            timestamp=datetime.now(),
            request_id=request_id,
            payload=payload,
            future=asyncio.Future()
        )

        # Add to queue
        async with self.lock:
            heapq.heappush(self.queue, request)

        # Process queue
        asyncio.create_task(self._process_queue())

        # Wait for result
        return await request.future

    async def _process_queue(self):
        """Process requests from queue"""
        # Only one processor at a time
        if not await self.semaphore.acquire():
            return

        try:
            async with self.lock:
                if not self.queue:
                    return

                # Get highest priority request
                request = heapq.heappop(self.queue)

            # Process request
            try:
                result = await self._execute_request(request)
                request.future.set_result(result)
            except Exception as e:
                request.future.set_exception(e)

        finally:
            self.semaphore.release()

            # Process next request if available
            if self.queue:
                asyncio.create_task(self._process_queue())

    async def _execute_request(self, request: PrioritizedRequest) -> Any:
        """Execute agent request"""
        print(f"Processing {request.request_id} (priority {request.priority})")

        # Simulate agent execution
        await asyncio.sleep(2)  # Replace with actual agent call

        return {
            "request_id": request.request_id,
            "result": f"Processed: {request.payload}"
        }

# Usage
async def main():
    queue = AgentRequestQueue(max_concurrent=3)

    # Submit requests with different priorities
    tasks = [
        queue.submit("Critical operation", priority=1, request_id="critical-1"),
        queue.submit("Normal operation", priority=5, request_id="normal-1"),
        queue.submit("Low priority task", priority=9, request_id="low-1"),
        queue.submit("Another critical", priority=1, request_id="critical-2"),
        queue.submit("Normal task", priority=5, request_id="normal-2"),
    ]

    # Wait for all results
    results = await asyncio.gather(*tasks)

    for result in results:
        print(f"Completed: {result}")

# asyncio.run(main())

Pattern 3: Comprehensive Observability

Monitor agents in production with metrics, logging, and tracing.

from dataclasses import dataclass
from typing import Dict, List, Optional
import time
import json
from datetime import datetime

@dataclass
class AgentMetrics:
    """Metrics for agent execution"""
    agent_id: str
    request_id: str
    start_time: float
    end_time: float
    duration_ms: float
    tokens_used: int
    cost_usd: float
    success: bool
    error: Optional[str] = None
    tool_calls: List[str] = None

class AgentObservability:
    """
    Comprehensive observability for agents
    """
    def __init__(self):
        self.metrics: List[AgentMetrics] = []
        self.active_requests: Dict[str, float] = {}

    def start_request(self, agent_id: str, request_id: str):
        """Track request start"""
        self.active_requests[request_id] = time.time()
        self._log({
            "event": "request_start",
            "agent_id": agent_id,
            "request_id": request_id,
            "timestamp": datetime.now().isoformat()
        })

    def end_request(
        self,
        agent_id: str,
        request_id: str,
        tokens_used: int,
        cost_usd: float,
        success: bool,
        tool_calls: List[str] = None,
        error: str = None
    ):
        """Track request completion"""
        start_time = self.active_requests.pop(request_id, time.time())
        end_time = time.time()
        duration_ms = (end_time - start_time) * 1000

        metrics = AgentMetrics(
            agent_id=agent_id,
            request_id=request_id,
            start_time=start_time,
            end_time=end_time,
            duration_ms=duration_ms,
            tokens_used=tokens_used,
            cost_usd=cost_usd,
            success=success,
            error=error,
            tool_calls=tool_calls or []
        )

        self.metrics.append(metrics)

        self._log({
            "event": "request_end",
            "agent_id": agent_id,
            "request_id": request_id,
            "duration_ms": duration_ms,
            "tokens_used": tokens_used,
            "cost_usd": cost_usd,
            "success": success,
            "error": error,
            "timestamp": datetime.now().isoformat()
        })

        # Alert on errors
        if not success:
            self._alert(f"Agent {agent_id} failed: {error}")

        # Alert on high costs
        if cost_usd > 1.0:
            self._alert(f"High cost request: ${cost_usd:.2f}")

        # Alert on slow requests
        if duration_ms > 30000:
            self._alert(f"Slow request: {duration_ms:.0f}ms")

    def get_statistics(self) -> dict:
        """Calculate aggregate statistics"""
        if not self.metrics:
            return {}

        total_requests = len(self.metrics)
        successful = sum(1 for m in self.metrics if m.success)
        failed = total_requests - successful

        total_duration = sum(m.duration_ms for m in self.metrics)
        total_tokens = sum(m.tokens_used for m in self.metrics)
        total_cost = sum(m.cost_usd for m in self.metrics)

        return {
            "total_requests": total_requests,
            "successful": successful,
            "failed": failed,
            "success_rate": successful / total_requests if total_requests > 0 else 0,
            "avg_duration_ms": total_duration / total_requests if total_requests > 0 else 0,
            "total_tokens": total_tokens,
            "total_cost_usd": total_cost,
            "avg_cost_per_request": total_cost / total_requests if total_requests > 0 else 0
        }

    def _log(self, data: dict):
        """Log structured data"""
        # In production, send to logging service (DataDog, CloudWatch, etc.)
        print(json.dumps(data))

    def _alert(self, message: str):
        """Send alert"""
        # In production, send to alerting service (PagerDuty, Slack, etc.)
        print(f"⚠️  ALERT: {message}")

# Usage with agents
class ObservableAgent:
    def __init__(self, agent_id: str, observability: AgentObservability):
        self.agent_id = agent_id
        self.obs = observability

    async def execute(self, request_id: str, query: str) -> str:
        """Execute agent with observability"""
        self.obs.start_request(self.agent_id, request_id)

        tokens_used = 0
        cost_usd = 0.0
        tool_calls = []
        error = None
        success = True

        try:
            # Your agent execution here
            from langchain_openai import ChatOpenAI
            from langchain.callbacks import get_openai_callback

            llm = ChatOpenAI(model="gpt-4")

            with get_openai_callback() as cb:
                result = await llm.ainvoke(query)
                tokens_used = cb.total_tokens
                cost_usd = cb.total_cost

            return result.content

        except Exception as e:
            success = False
            error = str(e)
            raise e

        finally:
            self.obs.end_request(
                agent_id=self.agent_id,
                request_id=request_id,
                tokens_used=tokens_used,
                cost_usd=cost_usd,
                success=success,
                tool_calls=tool_calls,
                error=error
            )

# Example
async def main():
    obs = AgentObservability()
    agent = ObservableAgent("research-agent-1", obs)

    # Execute requests
    for i in range(5):
        try:
            result = await agent.execute(f"req-{i}", f"Query {i}")
            print(f"Result {i}: {result[:50]}...")
        except Exception as e:
            print(f"Request {i} failed: {e}")

    # View statistics
    stats = obs.get_statistics()
    print("\nStatistics:")
    print(json.dumps(stats, indent=2))

# asyncio.run(main())

Pattern 4: Cost Control and Budget Management

Prevent runaway costs in production.

from datetime import datetime, timedelta
from typing import Optional

class CostGuard:
    """
    Enforce cost limits for agent execution
    """
    def __init__(
        self,
        hourly_limit: float = 10.0,
        daily_limit: float = 100.0,
        per_request_limit: float = 1.0
    ):
        self.hourly_limit = hourly_limit
        self.daily_limit = daily_limit
        self.per_request_limit = per_request_limit

        self.hourly_spend = 0.0
        self.daily_spend = 0.0
        self.hour_reset = datetime.now() + timedelta(hours=1)
        self.day_reset = datetime.now() + timedelta(days=1)

    def check_budget(self, estimated_cost: float) -> tuple[bool, Optional[str]]:
        """
        Check if request can proceed within budget
        Returns (can_proceed, reason_if_blocked)
        """
        self._reset_counters()

        # Check per-request limit
        if estimated_cost > self.per_request_limit:
            return False, f"Request cost ${estimated_cost:.2f} exceeds per-request limit ${self.per_request_limit:.2f}"

        # Check hourly limit
        if self.hourly_spend + estimated_cost > self.hourly_limit:
            return False, f"Hourly budget ${self.hourly_limit:.2f} exceeded (current: ${self.hourly_spend:.2f})"

        # Check daily limit
        if self.daily_spend + estimated_cost > self.daily_limit:
            return False, f"Daily budget ${self.daily_limit:.2f} exceeded (current: ${self.daily_spend:.2f})"

        return True, None

    def record_spend(self, cost: float):
        """Record actual spend"""
        self._reset_counters()
        self.hourly_spend += cost
        self.daily_spend += cost

    def _reset_counters(self):
        """Reset counters if time windows have passed"""
        now = datetime.now()

        if now >= self.hour_reset:
            self.hourly_spend = 0.0
            self.hour_reset = now + timedelta(hours=1)

        if now >= self.day_reset:
            self.daily_spend = 0.0
            self.day_reset = now + timedelta(days=1)

    def get_remaining_budget(self) -> dict:
        """Get remaining budget in each category"""
        self._reset_counters()
        return {
            "hourly_remaining": self.hourly_limit - self.hourly_spend,
            "daily_remaining": self.daily_limit - self.daily_spend,
            "per_request_limit": self.per_request_limit
        }

# Usage
class BudgetControlledAgent:
    def __init__(self, cost_guard: CostGuard):
        self.cost_guard = cost_guard

    async def execute(self, query: str) -> str:
        """Execute with cost controls"""
        # Estimate cost (rough approximation)
        estimated_tokens = len(query.split()) * 1.3  # Rough estimate
        estimated_cost = (estimated_tokens / 1000) * 0.03  # GPT-4 pricing

        # Check budget
        can_proceed, reason = self.cost_guard.check_budget(estimated_cost)
        if not can_proceed:
            raise Exception(f"Budget limit reached: {reason}")

        # Execute
        try:
            from langchain_openai import ChatOpenAI
            from langchain.callbacks import get_openai_callback

            llm = ChatOpenAI(model="gpt-4")

            with get_openai_callback() as cb:
                result = await llm.ainvoke(query)
                actual_cost = cb.total_cost

            # Record actual spend
            self.cost_guard.record_spend(actual_cost)

            return result.content

        except Exception as e:
            raise e

# Example
async def main():
    cost_guard = CostGuard(
        hourly_limit=5.0,
        daily_limit=50.0,
        per_request_limit=0.50
    )

    agent = BudgetControlledAgent(cost_guard)

    # Execute until budget exhausted
    for i in range(100):
        try:
            result = await agent.execute(f"Query {i}" * 100)  # Large query
            print(f"Request {i} succeeded")

            remaining = cost_guard.get_remaining_budget()
            print(f"Remaining budget: ${remaining['hourly_remaining']:.2f}/hr, ${remaining['daily_remaining']:.2f}/day")

        except Exception as e:
            print(f"Request {i} blocked: {e}")
            break

# asyncio.run(main())

Pattern 5: Graceful Degradation

Provide fallback behavior when systems fail.

from enum import Enum
from typing import Callable, Optional

class ServiceTier(Enum):
    """Service quality tiers"""
    PREMIUM = "premium"      # Full AI capabilities
    STANDARD = "standard"    # Cached + limited AI
    BASIC = "basic"         # Cached only
    EMERGENCY = "emergency"  # Static fallback

class GracefulDegradation:
    """
    Provide degraded service when full service unavailable
    """
    def __init__(self):
        self.current_tier = ServiceTier.PREMIUM
        self.cache = {}

    async def execute_with_fallback(
        self,
        request_id: str,
        query: str,
        premium_func: Callable,
        standard_func: Optional[Callable] = None,
        basic_func: Optional[Callable] = None
    ) -> tuple[str, ServiceTier]:
        """
        Execute with automatic fallback
        Returns (result, tier_used)
        """
        # Try premium (full AI)
        if self.current_tier == ServiceTier.PREMIUM:
            try:
                result = await premium_func(query)
                self.cache[query] = result  # Cache for future fallbacks
                return result, ServiceTier.PREMIUM
            except Exception as e:
                print(f"Premium service failed: {e}, falling back to standard")
                self.current_tier = ServiceTier.STANDARD

        # Try standard (limited AI)
        if self.current_tier == ServiceTier.STANDARD and standard_func:
            try:
                result = await standard_func(query)
                return result, ServiceTier.STANDARD
            except Exception as e:
                print(f"Standard service failed: {e}, falling back to basic")
                self.current_tier = ServiceTier.BASIC

        # Try basic (cache only)
        if self.current_tier == ServiceTier.BASIC:
            if query in self.cache:
                return self.cache[query], ServiceTier.BASIC
            elif basic_func:
                try:
                    result = await basic_func(query)
                    return result, ServiceTier.BASIC
                except Exception as e:
                    print(f"Basic service failed: {e}, using emergency fallback")
                    self.current_tier = ServiceTier.EMERGENCY

        # Emergency fallback
        return self._emergency_response(query), ServiceTier.EMERGENCY

    def _emergency_response(self, query: str) -> str:
        """Provide minimal emergency response"""
        return (
            "We're experiencing technical difficulties. "
            "Please try again in a few minutes, or contact support at support@example.com"
        )

    def recover_service_tier(self):
        """Attempt to recover to higher service tier"""
        if self.current_tier != ServiceTier.PREMIUM:
            print(f"Attempting to recover from {self.current_tier.value} to premium")
            self.current_tier = ServiceTier.PREMIUM

# Usage
class ResilientAgentSystem:
    def __init__(self):
        self.degradation = GracefulDegradation()

    async def premium_service(self, query: str) -> str:
        """Full AI service with all capabilities"""
        from langchain_openai import ChatOpenAI
        llm = ChatOpenAI(model="gpt-4", timeout=30)
        result = await llm.ainvoke(query)
        return result.content

    async def standard_service(self, query: str) -> str:
        """Limited AI service (faster/cheaper model)"""
        from langchain_openai import ChatOpenAI
        llm = ChatOpenAI(model="gpt-3.5-turbo", timeout=15)
        result = await llm.ainvoke(query)
        return result.content

    async def basic_service(self, query: str) -> str:
        """Rule-based service for common queries"""
        # Simple pattern matching for common queries
        if "hello" in query.lower():
            return "Hello! How can I help you today?"
        elif "status" in query.lower():
            return "All systems operational."
        else:
            raise Exception("No basic handler for this query")

    async def handle_request(self, request_id: str, query: str) -> dict:
        """Handle request with graceful degradation"""
        result, tier = await self.degradation.execute_with_fallback(
            request_id=request_id,
            query=query,
            premium_func=self.premium_service,
            standard_func=self.standard_service,
            basic_func=self.basic_service
        )

        return {
            "request_id": request_id,
            "result": result,
            "service_tier": tier.value,
            "degraded": tier != ServiceTier.PREMIUM
        }

# Example
async def main():
    system = ResilientAgentSystem()

    # Simulate various service conditions
    requests = [
        "What is machine learning?",
        "Hello",
        "What's the status?",
        "Complex query requiring AI"
    ]

    for i, query in enumerate(requests):
        response = await system.handle_request(f"req-{i}", query)
        print(f"\nRequest {i}:")
        print(f"  Query: {query}")
        print(f"  Tier: {response['service_tier']}")
        print(f"  Result: {response['result'][:100]}...")

        # Periodically attempt recovery
        if i % 3 == 0:
            system.degradation.recover_service_tier()

# asyncio.run(main())

Pattern 6: Distributed Tracing

Track requests across multiple agents and services.

import uuid
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Dict

@dataclass
class Span:
    """Represents a unit of work in distributed trace"""
    span_id: str
    parent_span_id: Optional[str]
    trace_id: str
    operation: str
    start_time: datetime
    end_time: Optional[datetime] = None
    duration_ms: Optional[float] = None
    tags: Dict[str, str] = field(default_factory=dict)
    logs: List[dict] = field(default_factory=list)
    error: Optional[str] = None

class DistributedTracer:
    """
    Distributed tracing for multi-agent systems
    """
    def __init__(self):
        self.traces: Dict[str, List[Span]] = {}

    def start_trace(self, operation: str) -> tuple[str, str]:
        """
        Start new trace
        Returns (trace_id, span_id)
        """
        trace_id = str(uuid.uuid4())
        span_id = str(uuid.uuid4())

        span = Span(
            span_id=span_id,
            parent_span_id=None,
            trace_id=trace_id,
            operation=operation,
            start_time=datetime.now()
        )

        if trace_id not in self.traces:
            self.traces[trace_id] = []
        self.traces[trace_id].append(span)

        return trace_id, span_id

    def start_span(
        self,
        trace_id: str,
        parent_span_id: str,
        operation: str,
        tags: dict = None
    ) -> str:
        """Start child span"""
        span_id = str(uuid.uuid4())

        span = Span(
            span_id=span_id,
            parent_span_id=parent_span_id,
            trace_id=trace_id,
            operation=operation,
            start_time=datetime.now(),
            tags=tags or {}
        )

        self.traces[trace_id].append(span)
        return span_id

    def end_span(
        self,
        trace_id: str,
        span_id: str,
        error: str = None,
        tags: dict = None
    ):
        """End span and record duration"""
        for span in self.traces[trace_id]:
            if span.span_id == span_id:
                span.end_time = datetime.now()
                span.duration_ms = (
                    (span.end_time - span.start_time).total_seconds() * 1000
                )
                span.error = error
                if tags:
                    span.tags.update(tags)
                break

    def log_event(
        self,
        trace_id: str,
        span_id: str,
        message: str,
        level: str = "info"
    ):
        """Add log to span"""
        for span in self.traces[trace_id]:
            if span.span_id == span_id:
                span.logs.append({
                    "timestamp": datetime.now().isoformat(),
                    "level": level,
                    "message": message
                })
                break

    def get_trace(self, trace_id: str) -> List[Span]:
        """Get all spans for trace"""
        return self.traces.get(trace_id, [])

    def visualize_trace(self, trace_id: str) -> str:
        """Simple text visualization of trace"""
        spans = self.get_trace(trace_id)
        if not spans:
            return "Trace not found"

        # Build tree structure
        root_spans = [s for s in spans if s.parent_span_id is None]

        def print_span(span: Span, indent: int = 0) -> List[str]:
            lines = []
            prefix = "  " * indent
            duration = f"{span.duration_ms:.2f}ms" if span.duration_ms else "running"
            error_marker = " ❌" if span.error else ""

            lines.append(f"{prefix}├─ {span.operation} ({duration}){error_marker}")

            if span.tags:
                lines.append(f"{prefix}   Tags: {span.tags}")

            if span.logs:
                for log in span.logs:
                    lines.append(f"{prefix}   Log: {log['message']}")

            # Find children
            children = [s for s in spans if s.parent_span_id == span.span_id]
            for child in children:
                lines.extend(print_span(child, indent + 1))

            return lines

        output = [f"Trace: {trace_id}\n"]
        for root in root_spans:
            output.extend(print_span(root))

        return "\n".join(output)

# Usage with multi-agent system
class TracedMultiAgentSystem:
    def __init__(self, tracer: DistributedTracer):
        self.tracer = tracer

    async def handle_request(self, user_query: str) -> str:
        """Handle request with full tracing"""
        # Start trace
        trace_id, root_span_id = self.tracer.start_trace("user_request")

        try:
            # Step 1: Route request
            route_span_id = self.tracer.start_span(
                trace_id, root_span_id, "route_request",
                tags={"query": user_query[:50]}
            )

            agent_type = await self._route_request(trace_id, route_span_id, user_query)

            self.tracer.end_span(trace_id, route_span_id, tags={"agent": agent_type})

            # Step 2: Execute with appropriate agent
            execute_span_id = self.tracer.start_span(
                trace_id, root_span_id, f"execute_{agent_type}",
                tags={"agent_type": agent_type}
            )

            result = await self._execute_agent(
                trace_id, execute_span_id, agent_type, user_query
            )

            self.tracer.end_span(trace_id, execute_span_id)

            # Step 3: Post-process
            process_span_id = self.tracer.start_span(
                trace_id, root_span_id, "post_process"
            )

            final_result = await self._post_process(
                trace_id, process_span_id, result
            )

            self.tracer.end_span(trace_id, process_span_id)

            # End root span
            self.tracer.end_span(trace_id, root_span_id)

            return final_result

        except Exception as e:
            self.tracer.log_event(
                trace_id, root_span_id,
                f"Error: {str(e)}",
                level="error"
            )
            self.tracer.end_span(trace_id, root_span_id, error=str(e))
            raise e

    async def _route_request(
        self, trace_id: str, span_id: str, query: str
    ) -> str:
        """Route to appropriate agent"""
        self.tracer.log_event(trace_id, span_id, "Analyzing query for routing")

        # Simulate routing logic
        import asyncio
        await asyncio.sleep(0.1)

        if "code" in query.lower():
            agent = "coder"
        elif "research" in query.lower():
            agent = "researcher"
        else:
            agent = "general"

        self.tracer.log_event(trace_id, span_id, f"Routed to {agent}")
        return agent

    async def _execute_agent(
        self, trace_id: str, span_id: str, agent_type: str, query: str
    ) -> str:
        """Execute specific agent"""
        # Sub-span for LLM call
        llm_span_id = self.tracer.start_span(
            trace_id, span_id, "llm_call",
            tags={"model": "gpt-4"}
        )

        self.tracer.log_event(trace_id, llm_span_id, "Calling LLM")

        # Simulate LLM call
        import asyncio
        await asyncio.sleep(0.5)

        self.tracer.log_event(trace_id, llm_span_id, "LLM response received")
        self.tracer.end_span(trace_id, llm_span_id, tags={"tokens": "1250"})

        return f"Result from {agent_type} agent"

    async def _post_process(
        self, trace_id: str, span_id: str, result: str
    ) -> str:
        """Post-process result"""
        self.tracer.log_event(trace_id, span_id, "Post-processing result")

        import asyncio
        await asyncio.sleep(0.1)

        return f"Processed: {result}"

# Example
async def main():
    tracer = DistributedTracer()
    system = TracedMultiAgentSystem(tracer)

    # Handle request
    result = await system.handle_request("Help me write code for sorting")

    # Visualize trace
    traces = list(tracer.traces.keys())
    if traces:
        print(tracer.visualize_trace(traces[0]))

# asyncio.run(main())

Deployment Patterns

Container Orchestration (Kubernetes)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-orchestrator
spec:
  replicas: 3
  selector:
    matchLabels:
      app: agent-orchestrator
  template:
    metadata:
      labels:
        app: agent-orchestrator
    spec:
      containers:
      - name: orchestrator
        image: agent-orchestrator:latest
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        env:
        - name: MAX_CONCURRENT_AGENTS
          value: "10"
        - name: COST_LIMIT_HOURLY
          value: "10.0"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-orchestrator-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-orchestrator
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70

Conclusion

Production agent orchestration requires:

  1. Resilience: Circuit breakers, retries, fallbacks
  2. Scalability: Request queuing, load balancing, auto-scaling
  3. Observability: Metrics, logging, tracing
  4. Cost Control: Budgets, rate limiting, optimization
  5. Graceful Degradation: Service tiers, caching, emergency responses

The patterns in this guide are battle-tested in production systems handling thousands of requests daily. Start with the basics (circuit breakers, observability) and layer in additional patterns as your system scales.

Remember: The best AI system is one that works reliably when it matters most.


Resources

  • OpenTelemetry: Standard for distributed tracing
  • Prometheus + Grafana: Metrics and monitoring
  • Sentry: Error tracking and alerting
  • AWS X-Ray / DataDog APM: Managed tracing solutions

Build systems that can handle production, and you’ll build AI that delivers real value.

#AI Agents #Production Systems #Orchestration #System Architecture #Reliability #Observability