Skip to main content

Building Multi-Agent Systems with LangChain: Orchestrating Collaborative AI

Ryan Dahlberg
Ryan Dahlberg
November 8, 2025 13 min read
Share:
Building Multi-Agent Systems with LangChain: Orchestrating Collaborative AI

The Rise of Multi-Agent AI

Single AI agents are powerful. But the real magic happens when multiple specialized agents collaborate, each bringing unique capabilities to solve complex problems that would overwhelm any single agent.

This post explores how to build production-ready multi-agent systems using LangChain, the leading framework for orchestrating LLM-powered agents. We’ll progress from basic concepts to sophisticated architectures you can deploy today.


Why Multi-Agent Systems?

The Specialization Advantage

Consider a customer service automation system:

Single Agent Approach:

  • One agent handles everything
  • Mixed results across different request types
  • Difficult to optimize
  • Hard to debug failures

Multi-Agent Approach:

  • Triage Agent: Routes requests to specialists
  • Technical Agent: Handles technical issues (tool: documentation search)
  • Billing Agent: Resolves payment issues (tool: billing API)
  • Escalation Agent: Handles complex cases requiring human review

Each agent excels at its specialty, and the system is easier to maintain and improve.

Real-World Use Cases

  1. Research Assistant: Coordinator → Search Agent → Analysis Agent → Writer Agent
  2. Code Generation: Planner → Coder → Reviewer → Tester
  3. Data Pipeline: Ingestion Agent → Validation Agent → Transform Agent → Quality Agent
  4. Security Operations: Monitor Agent → Triage Agent → Investigator Agent → Remediation Agent

LangChain Foundation

Setting Up

pip install langchain langchain-openai langchain-community langgraph
import os
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import tool
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph, END

# Initialize LLM
llm = ChatOpenAI(
    model="gpt-4",
    temperature=0,
    api_key=os.getenv("OPENAI_API_KEY")
)

Basic Agent Structure

@tool
def search_documentation(query: str) -> str:
    """Search technical documentation for information."""
    # Simplified - would connect to vector DB in production
    return f"Documentation results for: {query}"

@tool
def query_database(sql_query: str) -> str:
    """Execute SQL query against database."""
    # Simplified - would use actual DB connection
    return f"Query results for: {sql_query}"

# Create prompt template
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful AI assistant. Use the available tools to answer questions."),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    ("human", "{input}"),
    MessagesPlaceholder(variable_name="agent_scratchpad"),
])

# Create tools list
tools = [search_documentation, query_database]

# Create agent
agent = create_openai_functions_agent(llm, tools, prompt)

# Create executor
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    return_intermediate_steps=True
)

# Test
result = agent_executor.invoke({
    "input": "What are the authentication requirements?"
})
print(result["output"])

Pattern 1: Sequential Agents

Agents execute in a pipeline, each refining the previous agent’s output.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph

class ResearchState(TypedDict):
    """Shared state passed between agents"""
    query: str
    raw_data: str
    analyzed_data: str
    final_report: str
    messages: list

@tool
def web_search(query: str) -> str:
    """Search the web for information."""
    # Would use actual search API (Tavily, SerpAPI, etc.)
    return f"Search results for: {query}"

@tool
def data_analysis(data: str) -> str:
    """Analyze data and extract insights."""
    # Would perform actual analysis
    return f"Analysis of: {data}"

# Agent 1: Researcher
def research_agent(state: ResearchState) -> ResearchState:
    """Gathers raw information"""
    prompt = f"""You are a research agent. Search for information about: {state['query']}
    Use the web_search tool to gather comprehensive information."""

    agent = create_openai_functions_agent(llm, [web_search], ChatPromptTemplate.from_messages([
        ("system", prompt),
        MessagesPlaceholder(variable_name="agent_scratchpad")
    ]))

    executor = AgentExecutor(agent=agent, tools=[web_search])
    result = executor.invoke({"input": state["query"]})

    state["raw_data"] = result["output"]
    state["messages"].append(f"Research completed: {len(result['output'])} chars gathered")

    return state

# Agent 2: Analyzer
def analysis_agent(state: ResearchState) -> ResearchState:
    """Analyzes gathered information"""
    prompt = f"""You are an analysis agent. Analyze the following data and extract key insights:

    {state['raw_data']}

    Provide structured analysis with:
    - Key findings
    - Patterns
    - Recommendations"""

    response = llm.invoke([
        SystemMessage(content=prompt)
    ])

    state["analyzed_data"] = response.content
    state["messages"].append("Analysis completed")

    return state

# Agent 3: Writer
def writer_agent(state: ResearchState) -> ResearchState:
    """Creates final report"""
    prompt = f"""You are a report writing agent. Create a professional report based on:

    Original Query: {state['query']}

    Analysis: {state['analyzed_data']}

    Format the report with:
    - Executive Summary
    - Detailed Findings
    - Recommendations
    - Conclusion"""

    response = llm.invoke([
        SystemMessage(content=prompt)
    ])

    state["final_report"] = response.content
    state["messages"].append("Report completed")

    return state

# Build sequential workflow
workflow = StateGraph(ResearchState)

# Add nodes
workflow.add_node("researcher", research_agent)
workflow.add_node("analyzer", analysis_agent)
workflow.add_node("writer", writer_agent)

# Define flow
workflow.set_entry_point("researcher")
workflow.add_edge("researcher", "analyzer")
workflow.add_edge("analyzer", "writer")
workflow.add_edge("writer", END)

# Compile
app = workflow.compile()

# Execute
initial_state = {
    "query": "Latest developments in quantum computing",
    "raw_data": "",
    "analyzed_data": "",
    "final_report": "",
    "messages": []
}

result = app.invoke(initial_state)
print(result["final_report"])

Pattern 2: Parallel Agents

Multiple agents work simultaneously, then results are aggregated.

from typing import List
import asyncio

class ParallelState(TypedDict):
    """State for parallel execution"""
    task: str
    specialist_results: List[str]
    aggregated_result: str

# Specialist agents
def security_specialist(task: str) -> str:
    """Analyzes security implications"""
    prompt = f"""You are a security specialist. Analyze the security implications of: {task}

    Focus on:
    - Vulnerabilities
    - Attack vectors
    - Mitigation strategies"""

    response = llm.invoke([SystemMessage(content=prompt)])
    return f"Security Analysis:\n{response.content}"

def performance_specialist(task: str) -> str:
    """Analyzes performance implications"""
    prompt = f"""You are a performance specialist. Analyze the performance implications of: {task}

    Focus on:
    - Scalability
    - Bottlenecks
    - Optimization opportunities"""

    response = llm.invoke([SystemMessage(content=prompt)])
    return f"Performance Analysis:\n{response.content}"

def cost_specialist(task: str) -> str:
    """Analyzes cost implications"""
    prompt = f"""You are a cost analyst. Analyze the cost implications of: {task}

    Focus on:
    - Implementation costs
    - Operational costs
    - ROI considerations"""

    response = llm.invoke([SystemMessage(content=prompt)])
    return f"Cost Analysis:\n{response.content}"

# Parallel execution node
async def run_specialists(state: ParallelState) -> ParallelState:
    """Run all specialists in parallel"""
    task = state["task"]

    # Execute all specialists concurrently
    results = await asyncio.gather(
        asyncio.to_thread(security_specialist, task),
        asyncio.to_thread(performance_specialist, task),
        asyncio.to_thread(cost_specialist, task)
    )

    state["specialist_results"] = results
    return state

# Aggregator agent
def aggregator_agent(state: ParallelState) -> ParallelState:
    """Aggregates specialist insights"""
    all_results = "\n\n".join(state["specialist_results"])

    prompt = f"""You are a senior architect. Review these specialist analyses and provide:

    {all_results}

    Create a comprehensive summary that:
    - Synthesizes all perspectives
    - Identifies trade-offs
    - Provides actionable recommendations
    - Highlights critical concerns"""

    response = llm.invoke([SystemMessage(content=prompt)])
    state["aggregated_result"] = response.content

    return state

# Build parallel workflow
workflow = StateGraph(ParallelState)

workflow.add_node("specialists", lambda s: asyncio.run(run_specialists(s)))
workflow.add_node("aggregator", aggregator_agent)

workflow.set_entry_point("specialists")
workflow.add_edge("specialists", "aggregator")
workflow.add_edge("aggregator", END)

app = workflow.compile()

# Execute
result = app.invoke({
    "task": "Implementing a new microservices architecture",
    "specialist_results": [],
    "aggregated_result": ""
})

print(result["aggregated_result"])

Pattern 3: Hierarchical Agents

Manager agent coordinates worker agents dynamically.

class HierarchicalState(TypedDict):
    """State for hierarchical execution"""
    user_request: str
    plan: List[dict]
    worker_results: List[dict]
    final_response: str

@tool
def execute_code(code: str) -> str:
    """Execute Python code safely."""
    # In production, use sandboxed execution
    return f"Code executed: {code[:100]}..."

@tool
def search_api(endpoint: str) -> str:
    """Call external API."""
    return f"API response from {endpoint}"

@tool
def file_operations(operation: str, path: str) -> str:
    """Perform file operations."""
    return f"File operation: {operation} on {path}"

# Manager Agent
def manager_agent(state: HierarchicalState) -> HierarchicalState:
    """Plans and coordinates work"""
    prompt = f"""You are a manager agent coordinating a team of specialist workers:

    - Coder: Writes and executes code
    - API Specialist: Interacts with external APIs
    - File Manager: Handles file operations

    User Request: {state['user_request']}

    Create a detailed plan breaking down the work into tasks for each specialist.
    Return a JSON list of tasks with format:
    [{{"agent": "coder", "task": "...", "tools": ["execute_code"]}}, ...]"""

    response = llm.invoke([SystemMessage(content=prompt)])

    # Parse plan (simplified - would use proper JSON parsing)
    state["plan"] = [
        {"agent": "coder", "task": "Write data processing script"},
        {"agent": "api_specialist", "task": "Fetch data from API"},
        {"agent": "file_manager", "task": "Save results to file"}
    ]

    return state

# Worker Agents
def coder_agent(task: dict) -> dict:
    """Specialized coding agent"""
    prompt = f"""You are an expert programmer. Complete this task: {task['task']}
    Write production-quality code with error handling."""

    agent = create_openai_functions_agent(
        llm,
        [execute_code],
        ChatPromptTemplate.from_messages([
            ("system", prompt),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
    )

    executor = AgentExecutor(agent=agent, tools=[execute_code])
    result = executor.invoke({"input": task["task"]})

    return {"agent": "coder", "result": result["output"]}

def api_specialist_agent(task: dict) -> dict:
    """Specialized API interaction agent"""
    prompt = f"""You are an API integration specialist. Complete this task: {task['task']}
    Use best practices for API calls."""

    agent = create_openai_functions_agent(
        llm,
        [search_api],
        ChatPromptTemplate.from_messages([
            ("system", prompt),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
    )

    executor = AgentExecutor(agent=agent, tools=[search_api])
    result = executor.invoke({"input": task["task"]})

    return {"agent": "api_specialist", "result": result["output"]}

def file_manager_agent(task: dict) -> dict:
    """Specialized file operations agent"""
    prompt = f"""You are a file management specialist. Complete this task: {task['task']}
    Ensure proper file handling and error checking."""

    agent = create_openai_functions_agent(
        llm,
        [file_operations],
        ChatPromptTemplate.from_messages([
            ("system", prompt),
            MessagesPlaceholder(variable_name="agent_scratchpad")
        ])
    )

    executor = AgentExecutor(agent=agent, tools=[file_operations])
    result = executor.invoke({"input": task["task"]})

    return {"agent": "file_manager", "result": result["output"]}

# Worker dispatcher
def dispatch_workers(state: HierarchicalState) -> HierarchicalState:
    """Execute planned tasks with appropriate workers"""
    agent_map = {
        "coder": coder_agent,
        "api_specialist": api_specialist_agent,
        "file_manager": file_manager_agent
    }

    results = []
    for task in state["plan"]:
        agent_func = agent_map[task["agent"]]
        result = agent_func(task)
        results.append(result)

    state["worker_results"] = results
    return state

# Synthesizer
def synthesize_results(state: HierarchicalState) -> HierarchicalState:
    """Combine worker results into final response"""
    results_text = "\n\n".join([
        f"{r['agent']}: {r['result']}" for r in state["worker_results"]
    ])

    prompt = f"""Synthesize these worker results into a cohesive response:

    {results_text}

    Original request: {state['user_request']}

    Provide a clear summary of what was accomplished."""

    response = llm.invoke([SystemMessage(content=prompt)])
    state["final_response"] = response.content

    return state

# Build hierarchical workflow
workflow = StateGraph(HierarchicalState)

workflow.add_node("manager", manager_agent)
workflow.add_node("workers", dispatch_workers)
workflow.add_node("synthesizer", synthesize_results)

workflow.set_entry_point("manager")
workflow.add_edge("manager", "workers")
workflow.add_edge("workers", "synthesizer")
workflow.add_edge("synthesizer", END)

app = workflow.compile()

# Execute
result = app.invoke({
    "user_request": "Fetch user data from API, process it, and save to CSV",
    "plan": [],
    "worker_results": [],
    "final_response": ""
})

print(result["final_response"])

Pattern 4: Autonomous Agents with Feedback Loops

Agents that self-correct and improve through iteration.

class AutonomousState(TypedDict):
    """State for autonomous agent with feedback"""
    objective: str
    current_attempt: int
    max_attempts: int
    solution: str
    validation_result: dict
    final_solution: str

def problem_solver(state: AutonomousState) -> AutonomousState:
    """Generate solution attempt"""
    previous_feedback = ""
    if state["current_attempt"] > 0:
        previous_feedback = f"""
        Previous attempt failed with:
        {state['validation_result'].get('error', '')}

        Improve your solution based on this feedback.
        """

    prompt = f"""You are a problem-solving agent.

    Objective: {state['objective']}
    Attempt: {state['current_attempt'] + 1}/{state['max_attempts']}

    {previous_feedback}

    Provide a complete solution."""

    response = llm.invoke([SystemMessage(content=prompt)])

    state["solution"] = response.content
    state["current_attempt"] += 1

    return state

def validator(state: AutonomousState) -> AutonomousState:
    """Validate solution quality"""
    prompt = f"""You are a quality assurance agent. Validate this solution:

    Objective: {state['objective']}
    Solution: {state['solution']}

    Check for:
    - Correctness
    - Completeness
    - Edge cases
    - Best practices

    Return validation results as:
    {{"valid": true/false, "score": 0-100, "feedback": "...", "error": "..."}}"""

    response = llm.invoke([SystemMessage(content=prompt)])

    # Parse validation (simplified)
    import json
    try:
        validation = json.loads(response.content)
    except:
        validation = {"valid": False, "score": 0, "feedback": "Parse error"}

    state["validation_result"] = validation

    return state

def should_continue(state: AutonomousState) -> str:
    """Decision point: continue iterating or finish"""
    if state["validation_result"].get("valid", False):
        return "finish"
    elif state["current_attempt"] >= state["max_attempts"]:
        return "max_attempts"
    else:
        return "retry"

def finalize_solution(state: AutonomousState) -> AutonomousState:
    """Finalize successful solution"""
    state["final_solution"] = state["solution"]
    return state

# Build autonomous workflow with feedback loop
workflow = StateGraph(AutonomousState)

workflow.add_node("solver", problem_solver)
workflow.add_node("validator", validator)
workflow.add_node("finalize", finalize_solution)

workflow.set_entry_point("solver")
workflow.add_edge("solver", "validator")

# Conditional branching
workflow.add_conditional_edges(
    "validator",
    should_continue,
    {
        "retry": "solver",      # Try again
        "finish": "finalize",   # Success
        "max_attempts": END     # Give up
    }
)

workflow.add_edge("finalize", END)

app = workflow.compile()

# Execute with self-correction
result = app.invoke({
    "objective": "Write a function to efficiently find duplicates in a large list",
    "current_attempt": 0,
    "max_attempts": 3,
    "solution": "",
    "validation_result": {},
    "final_solution": ""
})

print(f"Solution found after {result['current_attempt']} attempts:")
print(result["final_solution"])

Advanced Patterns

Memory and Context Sharing

from langchain.memory import ConversationBufferMemory

class SharedMemoryState(TypedDict):
    """State with shared memory across agents"""
    conversation_history: list
    shared_knowledge: dict

memory = ConversationBufferMemory(
    memory_key="chat_history",
    return_messages=True
)

def agent_with_memory(state: SharedMemoryState, agent_name: str) -> SharedMemoryState:
    """Agent that maintains conversation context"""
    # Load previous context
    history = state["conversation_history"]

    prompt = f"""You are {agent_name}. You have access to the conversation history:

    {history}

    And shared knowledge base:
    {state['shared_knowledge']}

    Use this context to provide informed responses."""

    # ... agent logic with memory access ...

    return state

Dynamic Tool Selection

@tool
def get_weather(location: str) -> str:
    """Get current weather for location."""
    return f"Weather in {location}: Sunny, 72°F"

@tool
def get_stock_price(symbol: str) -> str:
    """Get current stock price."""
    return f"Stock {symbol}: $150.00"

@tool
def calculate(expression: str) -> str:
    """Evaluate mathematical expression."""
    try:
        result = eval(expression)  # Use safe eval in production!
        return str(result)
    except:
        return "Invalid expression"

ALL_TOOLS = {
    "weather": get_weather,
    "stocks": get_stock_price,
    "math": calculate
}

def tool_selector_agent(query: str) -> list:
    """Dynamically select which tools are needed"""
    prompt = f"""Given this query: "{query}"

    Available tools: {list(ALL_TOOLS.keys())}

    Which tools would be most useful? Return as JSON list: ["tool1", "tool2"]"""

    response = llm.invoke([SystemMessage(content=prompt)])

    # Parse and return selected tools
    import json
    selected = json.loads(response.content)
    return [ALL_TOOLS[tool] for tool in selected if tool in ALL_TOOLS]

# Use dynamically selected tools
query = "What's the weather in NYC and AAPL stock price?"
tools = tool_selector_agent(query)
# Create agent with only relevant tools...

Production Considerations

1. Error Handling

def robust_agent_wrapper(agent_func):
    """Wrapper for robust agent execution"""
    def wrapper(state):
        max_retries = 3
        for attempt in range(max_retries):
            try:
                return agent_func(state)
            except Exception as e:
                print(f"Agent error (attempt {attempt + 1}): {e}")
                if attempt == max_retries - 1:
                    # Final attempt failed
                    state["error"] = str(e)
                    state["status"] = "failed"
                    return state
                # Exponential backoff
                import time
                time.sleep(2 ** attempt)
        return state
    return wrapper

@robust_agent_wrapper
def fallible_agent(state):
    # Agent that might fail
    pass

2. Cost Management

from langchain.callbacks import get_openai_callback

def cost_aware_execution(workflow_app, state):
    """Track and limit costs"""
    with get_openai_callback() as cb:
        result = workflow_app.invoke(state)

        print(f"Cost Summary:")
        print(f"  Tokens: {cb.total_tokens}")
        print(f"  Cost: ${cb.total_cost:.4f}")

        if cb.total_cost > 1.00:  # $1 threshold
            print("⚠️  Warning: High cost detected")

    return result

3. Observability

import logging
from datetime import datetime

class AgentTracer:
    """Track agent execution for debugging"""
    def __init__(self):
        self.trace = []

    def log_agent_start(self, agent_name, input_data):
        self.trace.append({
            "timestamp": datetime.now(),
            "event": "start",
            "agent": agent_name,
            "input": input_data
        })

    def log_agent_end(self, agent_name, output_data):
        self.trace.append({
            "timestamp": datetime.now(),
            "event": "end",
            "agent": agent_name,
            "output": output_data
        })

    def export_trace(self):
        """Export trace for analysis"""
        return self.trace

# Use in agents
tracer = AgentTracer()

def traced_agent(state):
    tracer.log_agent_start("my_agent", state)
    result = # ... agent logic ...
    tracer.log_agent_end("my_agent", result)
    return result

Testing Multi-Agent Systems

import pytest

def test_sequential_workflow():
    """Test sequential agent execution"""
    state = {
        "query": "test query",
        "raw_data": "",
        "analyzed_data": "",
        "final_report": "",
        "messages": []
    }

    result = app.invoke(state)

    assert result["raw_data"] != ""
    assert result["analyzed_data"] != ""
    assert result["final_report"] != ""
    assert len(result["messages"]) == 3

def test_agent_isolation():
    """Ensure agents don't interfere with each other"""
    # Test that one agent's failure doesn't break others
    pass

def test_state_consistency():
    """Verify state remains consistent across agents"""
    # Validate state schema at each step
    pass

Conclusion

Multi-agent systems represent the next evolution in AI applications. By combining specialized agents, you can:

  • Solve complex problems that overwhelm single agents
  • Improve reliability through redundancy and validation
  • Scale capabilities by adding new specialized agents
  • Maintain systems by isolating agent responsibilities

LangChain provides the building blocks, but the art lies in:

  • Choosing the right architecture pattern
  • Designing effective agent interactions
  • Managing state and memory appropriately
  • Building robust error handling
  • Optimizing for cost and performance

Start simple with sequential agents, then evolve to parallel and hierarchical patterns as your needs grow. The future of AI is collaborative, and multi-agent systems are how we get there.


Resources

  • LangChain Documentation: Comprehensive guides and API reference
  • LangGraph: Advanced orchestration patterns
  • Research Papers: “Communicating Agents” literature
  • GitHub Examples: langchain-ai/langgraph examples repository

Build agents that work together, and you’ll build AI that works better.

#AI Agents #LangChain #Multi-Agent Systems #LLM Orchestration #Python #Agent Architecture