Building Multi-Agent Systems with LangChain: Orchestrating Collaborative AI
The Rise of Multi-Agent AI
Single AI agents are powerful. But the real magic happens when multiple specialized agents collaborate, each bringing unique capabilities to solve complex problems that would overwhelm any single agent.
This post explores how to build production-ready multi-agent systems using LangChain, the leading framework for orchestrating LLM-powered agents. We’ll progress from basic concepts to sophisticated architectures you can deploy today.
Why Multi-Agent Systems?
The Specialization Advantage
Consider a customer service automation system:
Single Agent Approach:
- One agent handles everything
- Mixed results across different request types
- Difficult to optimize
- Hard to debug failures
Multi-Agent Approach:
- Triage Agent: Routes requests to specialists
- Technical Agent: Handles technical issues (tool: documentation search)
- Billing Agent: Resolves payment issues (tool: billing API)
- Escalation Agent: Handles complex cases requiring human review
Each agent excels at its specialty, and the system is easier to maintain and improve.
Real-World Use Cases
- Research Assistant: Coordinator → Search Agent → Analysis Agent → Writer Agent
- Code Generation: Planner → Coder → Reviewer → Tester
- Data Pipeline: Ingestion Agent → Validation Agent → Transform Agent → Quality Agent
- Security Operations: Monitor Agent → Triage Agent → Investigator Agent → Remediation Agent
LangChain Foundation
Setting Up
pip install langchain langchain-openai langchain-community langgraph
import os
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import tool
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, END
# Initialize LLM
llm = ChatOpenAI(
model="gpt-4",
temperature=0,
api_key=os.getenv("OPENAI_API_KEY")
)
Basic Agent Structure
@tool
def search_documentation(query: str) -> str:
"""Search technical documentation for information."""
# Simplified - would connect to vector DB in production
return f"Documentation results for: {query}"
@tool
def query_database(sql_query: str) -> str:
"""Execute SQL query against database."""
# Simplified - would use actual DB connection
return f"Query results for: {sql_query}"
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful AI assistant. Use the available tools to answer questions."),
MessagesPlaceholder(variable_name="chat_history", optional=True),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# Create tools list
tools = [search_documentation, query_database]
# Create agent
agent = create_openai_functions_agent(llm, tools, prompt)
# Create executor
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
return_intermediate_steps=True
)
# Test
result = agent_executor.invoke({
"input": "What are the authentication requirements?"
})
print(result["output"])
Pattern 1: Sequential Agents
Agents execute in a pipeline, each refining the previous agent’s output.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
class ResearchState(TypedDict):
"""Shared state passed between agents"""
query: str
raw_data: str
analyzed_data: str
final_report: str
messages: list
@tool
def web_search(query: str) -> str:
"""Search the web for information."""
# Would use actual search API (Tavily, SerpAPI, etc.)
return f"Search results for: {query}"
@tool
def data_analysis(data: str) -> str:
"""Analyze data and extract insights."""
# Would perform actual analysis
return f"Analysis of: {data}"
# Agent 1: Researcher
def research_agent(state: ResearchState) -> ResearchState:
"""Gathers raw information"""
prompt = f"""You are a research agent. Search for information about: {state['query']}
Use the web_search tool to gather comprehensive information."""
agent = create_openai_functions_agent(llm, [web_search], ChatPromptTemplate.from_messages([
("system", prompt),
MessagesPlaceholder(variable_name="agent_scratchpad")
]))
executor = AgentExecutor(agent=agent, tools=[web_search])
result = executor.invoke({"input": state["query"]})
state["raw_data"] = result["output"]
state["messages"].append(f"Research completed: {len(result['output'])} chars gathered")
return state
# Agent 2: Analyzer
def analysis_agent(state: ResearchState) -> ResearchState:
"""Analyzes gathered information"""
prompt = f"""You are an analysis agent. Analyze the following data and extract key insights:
{state['raw_data']}
Provide structured analysis with:
- Key findings
- Patterns
- Recommendations"""
response = llm.invoke([
SystemMessage(content=prompt)
])
state["analyzed_data"] = response.content
state["messages"].append("Analysis completed")
return state
# Agent 3: Writer
def writer_agent(state: ResearchState) -> ResearchState:
"""Creates final report"""
prompt = f"""You are a report writing agent. Create a professional report based on:
Original Query: {state['query']}
Analysis: {state['analyzed_data']}
Format the report with:
- Executive Summary
- Detailed Findings
- Recommendations
- Conclusion"""
response = llm.invoke([
SystemMessage(content=prompt)
])
state["final_report"] = response.content
state["messages"].append("Report completed")
return state
# Build sequential workflow
workflow = StateGraph(ResearchState)
# Add nodes
workflow.add_node("researcher", research_agent)
workflow.add_node("analyzer", analysis_agent)
workflow.add_node("writer", writer_agent)
# Define flow
workflow.set_entry_point("researcher")
workflow.add_edge("researcher", "analyzer")
workflow.add_edge("analyzer", "writer")
workflow.add_edge("writer", END)
# Compile
app = workflow.compile()
# Execute
initial_state = {
"query": "Latest developments in quantum computing",
"raw_data": "",
"analyzed_data": "",
"final_report": "",
"messages": []
}
result = app.invoke(initial_state)
print(result["final_report"])
Pattern 2: Parallel Agents
Multiple agents work simultaneously, then results are aggregated.
from typing import List
import asyncio
class ParallelState(TypedDict):
"""State for parallel execution"""
task: str
specialist_results: List[str]
aggregated_result: str
# Specialist agents
def security_specialist(task: str) -> str:
"""Analyzes security implications"""
prompt = f"""You are a security specialist. Analyze the security implications of: {task}
Focus on:
- Vulnerabilities
- Attack vectors
- Mitigation strategies"""
response = llm.invoke([SystemMessage(content=prompt)])
return f"Security Analysis:\n{response.content}"
def performance_specialist(task: str) -> str:
"""Analyzes performance implications"""
prompt = f"""You are a performance specialist. Analyze the performance implications of: {task}
Focus on:
- Scalability
- Bottlenecks
- Optimization opportunities"""
response = llm.invoke([SystemMessage(content=prompt)])
return f"Performance Analysis:\n{response.content}"
def cost_specialist(task: str) -> str:
"""Analyzes cost implications"""
prompt = f"""You are a cost analyst. Analyze the cost implications of: {task}
Focus on:
- Implementation costs
- Operational costs
- ROI considerations"""
response = llm.invoke([SystemMessage(content=prompt)])
return f"Cost Analysis:\n{response.content}"
# Parallel execution node
async def run_specialists(state: ParallelState) -> ParallelState:
"""Run all specialists in parallel"""
task = state["task"]
# Execute all specialists concurrently
results = await asyncio.gather(
asyncio.to_thread(security_specialist, task),
asyncio.to_thread(performance_specialist, task),
asyncio.to_thread(cost_specialist, task)
)
state["specialist_results"] = results
return state
# Aggregator agent
def aggregator_agent(state: ParallelState) -> ParallelState:
"""Aggregates specialist insights"""
all_results = "\n\n".join(state["specialist_results"])
prompt = f"""You are a senior architect. Review these specialist analyses and provide:
{all_results}
Create a comprehensive summary that:
- Synthesizes all perspectives
- Identifies trade-offs
- Provides actionable recommendations
- Highlights critical concerns"""
response = llm.invoke([SystemMessage(content=prompt)])
state["aggregated_result"] = response.content
return state
# Build parallel workflow
workflow = StateGraph(ParallelState)
workflow.add_node("specialists", lambda s: asyncio.run(run_specialists(s)))
workflow.add_node("aggregator", aggregator_agent)
workflow.set_entry_point("specialists")
workflow.add_edge("specialists", "aggregator")
workflow.add_edge("aggregator", END)
app = workflow.compile()
# Execute
result = app.invoke({
"task": "Implementing a new microservices architecture",
"specialist_results": [],
"aggregated_result": ""
})
print(result["aggregated_result"])
Pattern 3: Hierarchical Agents
Manager agent coordinates worker agents dynamically.
class HierarchicalState(TypedDict):
"""State for hierarchical execution"""
user_request: str
plan: List[dict]
worker_results: List[dict]
final_response: str
@tool
def execute_code(code: str) -> str:
"""Execute Python code safely."""
# In production, use sandboxed execution
return f"Code executed: {code[:100]}..."
@tool
def search_api(endpoint: str) -> str:
"""Call external API."""
return f"API response from {endpoint}"
@tool
def file_operations(operation: str, path: str) -> str:
"""Perform file operations."""
return f"File operation: {operation} on {path}"
# Manager Agent
def manager_agent(state: HierarchicalState) -> HierarchicalState:
"""Plans and coordinates work"""
prompt = f"""You are a manager agent coordinating a team of specialist workers:
- Coder: Writes and executes code
- API Specialist: Interacts with external APIs
- File Manager: Handles file operations
User Request: {state['user_request']}
Create a detailed plan breaking down the work into tasks for each specialist.
Return a JSON list of tasks with format:
[{{"agent": "coder", "task": "...", "tools": ["execute_code"]}}, ...]"""
response = llm.invoke([SystemMessage(content=prompt)])
# Parse plan (simplified - would use proper JSON parsing)
state["plan"] = [
{"agent": "coder", "task": "Write data processing script"},
{"agent": "api_specialist", "task": "Fetch data from API"},
{"agent": "file_manager", "task": "Save results to file"}
]
return state
# Worker Agents
def coder_agent(task: dict) -> dict:
"""Specialized coding agent"""
prompt = f"""You are an expert programmer. Complete this task: {task['task']}
Write production-quality code with error handling."""
agent = create_openai_functions_agent(
llm,
[execute_code],
ChatPromptTemplate.from_messages([
("system", prompt),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
)
executor = AgentExecutor(agent=agent, tools=[execute_code])
result = executor.invoke({"input": task["task"]})
return {"agent": "coder", "result": result["output"]}
def api_specialist_agent(task: dict) -> dict:
"""Specialized API interaction agent"""
prompt = f"""You are an API integration specialist. Complete this task: {task['task']}
Use best practices for API calls."""
agent = create_openai_functions_agent(
llm,
[search_api],
ChatPromptTemplate.from_messages([
("system", prompt),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
)
executor = AgentExecutor(agent=agent, tools=[search_api])
result = executor.invoke({"input": task["task"]})
return {"agent": "api_specialist", "result": result["output"]}
def file_manager_agent(task: dict) -> dict:
"""Specialized file operations agent"""
prompt = f"""You are a file management specialist. Complete this task: {task['task']}
Ensure proper file handling and error checking."""
agent = create_openai_functions_agent(
llm,
[file_operations],
ChatPromptTemplate.from_messages([
("system", prompt),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
)
executor = AgentExecutor(agent=agent, tools=[file_operations])
result = executor.invoke({"input": task["task"]})
return {"agent": "file_manager", "result": result["output"]}
# Worker dispatcher
def dispatch_workers(state: HierarchicalState) -> HierarchicalState:
"""Execute planned tasks with appropriate workers"""
agent_map = {
"coder": coder_agent,
"api_specialist": api_specialist_agent,
"file_manager": file_manager_agent
}
results = []
for task in state["plan"]:
agent_func = agent_map[task["agent"]]
result = agent_func(task)
results.append(result)
state["worker_results"] = results
return state
# Synthesizer
def synthesize_results(state: HierarchicalState) -> HierarchicalState:
"""Combine worker results into final response"""
results_text = "\n\n".join([
f"{r['agent']}: {r['result']}" for r in state["worker_results"]
])
prompt = f"""Synthesize these worker results into a cohesive response:
{results_text}
Original request: {state['user_request']}
Provide a clear summary of what was accomplished."""
response = llm.invoke([SystemMessage(content=prompt)])
state["final_response"] = response.content
return state
# Build hierarchical workflow
workflow = StateGraph(HierarchicalState)
workflow.add_node("manager", manager_agent)
workflow.add_node("workers", dispatch_workers)
workflow.add_node("synthesizer", synthesize_results)
workflow.set_entry_point("manager")
workflow.add_edge("manager", "workers")
workflow.add_edge("workers", "synthesizer")
workflow.add_edge("synthesizer", END)
app = workflow.compile()
# Execute
result = app.invoke({
"user_request": "Fetch user data from API, process it, and save to CSV",
"plan": [],
"worker_results": [],
"final_response": ""
})
print(result["final_response"])
Pattern 4: Autonomous Agents with Feedback Loops
Agents that self-correct and improve through iteration.
class AutonomousState(TypedDict):
"""State for autonomous agent with feedback"""
objective: str
current_attempt: int
max_attempts: int
solution: str
validation_result: dict
final_solution: str
def problem_solver(state: AutonomousState) -> AutonomousState:
"""Generate solution attempt"""
previous_feedback = ""
if state["current_attempt"] > 0:
previous_feedback = f"""
Previous attempt failed with:
{state['validation_result'].get('error', '')}
Improve your solution based on this feedback.
"""
prompt = f"""You are a problem-solving agent.
Objective: {state['objective']}
Attempt: {state['current_attempt'] + 1}/{state['max_attempts']}
{previous_feedback}
Provide a complete solution."""
response = llm.invoke([SystemMessage(content=prompt)])
state["solution"] = response.content
state["current_attempt"] += 1
return state
def validator(state: AutonomousState) -> AutonomousState:
"""Validate solution quality"""
prompt = f"""You are a quality assurance agent. Validate this solution:
Objective: {state['objective']}
Solution: {state['solution']}
Check for:
- Correctness
- Completeness
- Edge cases
- Best practices
Return validation results as:
{{"valid": true/false, "score": 0-100, "feedback": "...", "error": "..."}}"""
response = llm.invoke([SystemMessage(content=prompt)])
# Parse validation (simplified)
import json
try:
validation = json.loads(response.content)
except:
validation = {"valid": False, "score": 0, "feedback": "Parse error"}
state["validation_result"] = validation
return state
def should_continue(state: AutonomousState) -> str:
"""Decision point: continue iterating or finish"""
if state["validation_result"].get("valid", False):
return "finish"
elif state["current_attempt"] >= state["max_attempts"]:
return "max_attempts"
else:
return "retry"
def finalize_solution(state: AutonomousState) -> AutonomousState:
"""Finalize successful solution"""
state["final_solution"] = state["solution"]
return state
# Build autonomous workflow with feedback loop
workflow = StateGraph(AutonomousState)
workflow.add_node("solver", problem_solver)
workflow.add_node("validator", validator)
workflow.add_node("finalize", finalize_solution)
workflow.set_entry_point("solver")
workflow.add_edge("solver", "validator")
# Conditional branching
workflow.add_conditional_edges(
"validator",
should_continue,
{
"retry": "solver", # Try again
"finish": "finalize", # Success
"max_attempts": END # Give up
}
)
workflow.add_edge("finalize", END)
app = workflow.compile()
# Execute with self-correction
result = app.invoke({
"objective": "Write a function to efficiently find duplicates in a large list",
"current_attempt": 0,
"max_attempts": 3,
"solution": "",
"validation_result": {},
"final_solution": ""
})
print(f"Solution found after {result['current_attempt']} attempts:")
print(result["final_solution"])
Advanced Patterns
Memory and Context Sharing
from langchain.memory import ConversationBufferMemory
class SharedMemoryState(TypedDict):
"""State with shared memory across agents"""
conversation_history: list
shared_knowledge: dict
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
def agent_with_memory(state: SharedMemoryState, agent_name: str) -> SharedMemoryState:
"""Agent that maintains conversation context"""
# Load previous context
history = state["conversation_history"]
prompt = f"""You are {agent_name}. You have access to the conversation history:
{history}
And shared knowledge base:
{state['shared_knowledge']}
Use this context to provide informed responses."""
# ... agent logic with memory access ...
return state
Dynamic Tool Selection
@tool
def get_weather(location: str) -> str:
"""Get current weather for location."""
return f"Weather in {location}: Sunny, 72°F"
@tool
def get_stock_price(symbol: str) -> str:
"""Get current stock price."""
return f"Stock {symbol}: $150.00"
@tool
def calculate(expression: str) -> str:
"""Evaluate mathematical expression."""
try:
result = eval(expression) # Use safe eval in production!
return str(result)
except:
return "Invalid expression"
ALL_TOOLS = {
"weather": get_weather,
"stocks": get_stock_price,
"math": calculate
}
def tool_selector_agent(query: str) -> list:
"""Dynamically select which tools are needed"""
prompt = f"""Given this query: "{query}"
Available tools: {list(ALL_TOOLS.keys())}
Which tools would be most useful? Return as JSON list: ["tool1", "tool2"]"""
response = llm.invoke([SystemMessage(content=prompt)])
# Parse and return selected tools
import json
selected = json.loads(response.content)
return [ALL_TOOLS[tool] for tool in selected if tool in ALL_TOOLS]
# Use dynamically selected tools
query = "What's the weather in NYC and AAPL stock price?"
tools = tool_selector_agent(query)
# Create agent with only relevant tools...
Production Considerations
1. Error Handling
def robust_agent_wrapper(agent_func):
"""Wrapper for robust agent execution"""
def wrapper(state):
max_retries = 3
for attempt in range(max_retries):
try:
return agent_func(state)
except Exception as e:
print(f"Agent error (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
# Final attempt failed
state["error"] = str(e)
state["status"] = "failed"
return state
# Exponential backoff
import time
time.sleep(2 ** attempt)
return state
return wrapper
@robust_agent_wrapper
def fallible_agent(state):
# Agent that might fail
pass
2. Cost Management
from langchain.callbacks import get_openai_callback
def cost_aware_execution(workflow_app, state):
"""Track and limit costs"""
with get_openai_callback() as cb:
result = workflow_app.invoke(state)
print(f"Cost Summary:")
print(f" Tokens: {cb.total_tokens}")
print(f" Cost: ${cb.total_cost:.4f}")
if cb.total_cost > 1.00: # $1 threshold
print("⚠️ Warning: High cost detected")
return result
3. Observability
import logging
from datetime import datetime
class AgentTracer:
"""Track agent execution for debugging"""
def __init__(self):
self.trace = []
def log_agent_start(self, agent_name, input_data):
self.trace.append({
"timestamp": datetime.now(),
"event": "start",
"agent": agent_name,
"input": input_data
})
def log_agent_end(self, agent_name, output_data):
self.trace.append({
"timestamp": datetime.now(),
"event": "end",
"agent": agent_name,
"output": output_data
})
def export_trace(self):
"""Export trace for analysis"""
return self.trace
# Use in agents
tracer = AgentTracer()
def traced_agent(state):
tracer.log_agent_start("my_agent", state)
result = # ... agent logic ...
tracer.log_agent_end("my_agent", result)
return result
Testing Multi-Agent Systems
import pytest
def test_sequential_workflow():
"""Test sequential agent execution"""
state = {
"query": "test query",
"raw_data": "",
"analyzed_data": "",
"final_report": "",
"messages": []
}
result = app.invoke(state)
assert result["raw_data"] != ""
assert result["analyzed_data"] != ""
assert result["final_report"] != ""
assert len(result["messages"]) == 3
def test_agent_isolation():
"""Ensure agents don't interfere with each other"""
# Test that one agent's failure doesn't break others
pass
def test_state_consistency():
"""Verify state remains consistent across agents"""
# Validate state schema at each step
pass
Conclusion
Multi-agent systems represent the next evolution in AI applications. By combining specialized agents, you can:
- Solve complex problems that overwhelm single agents
- Improve reliability through redundancy and validation
- Scale capabilities by adding new specialized agents
- Maintain systems by isolating agent responsibilities
LangChain provides the building blocks, but the art lies in:
- Choosing the right architecture pattern
- Designing effective agent interactions
- Managing state and memory appropriately
- Building robust error handling
- Optimizing for cost and performance
Start simple with sequential agents, then evolve to parallel and hierarchical patterns as your needs grow. The future of AI is collaborative, and multi-agent systems are how we get there.
Resources
- LangChain Documentation: Comprehensive guides and API reference
- LangGraph: Advanced orchestration patterns
- Research Papers: “Communicating Agents” literature
- GitHub Examples: langchain-ai/langgraph examples repository
Build agents that work together, and you’ll build AI that works better.