PyTorch Neural Routing in Production
PyTorch Neural Routing in Production
Cortex uses a hybrid routing system: pattern matching for interpretability + neural networks for complex patterns.
Today, we’ll explore the PyTorch neural routing layer.
Why Neural Routing?
Pattern matching is great, but has limitations:
Pattern Matching Strengths ✅
- Interpretable (you see why decisions were made)
- Fast (simple keyword matching)
- No training required
- Works immediately
Pattern Matching Weaknesses ❌
- Can’t learn complex patterns
- Limited to explicit rules
- Struggles with ambiguity
- No semantic understanding
Neural routing solves these limitations.
The Hybrid Architecture
The routing system combines three complementary approaches for optimal task assignment:
graph LR
A[Task Input] --> B[Pattern Router<br/>60%]
A --> C[Neural Router<br/>30%]
A --> D[Domain Router<br/>10%]
B --> E[Weighted<br/>Combiner]
C --> E
D --> E
E --> F[Final Routing<br/>Decision]
style A fill:#30363d,stroke:#58a6ff,stroke-width:2px
style B fill:#30363d,stroke:#58a6ff,stroke-width:2px
style C fill:#30363d,stroke:#58a6ff,stroke-width:2px
style D fill:#30363d,stroke:#58a6ff,stroke-width:2px
style E fill:#30363d,stroke:#f85149,stroke-width:2px
style F fill:#30363d,stroke:#00d084,stroke-width:2px
function route(task) {
// 60% pattern matching
const patternScore = patternRouter.route(task);
// 30% neural network
const neuralScore = neuralRouter.route(task);
// 10% domain rules
const domainScore = domainRouter.route(task);
// Weighted combination
return combineScores(patternScore, neuralScore, domainScore);
}
Result: Best of both worlds
Neural Network Architecture
Input Layer
Task description → Embedding:
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModel
class TaskEmbedder(nn.Module):
def __init__(self):
super().__init__()
self.tokenizer = AutoTokenizer.from_pretrained(
'sentence-transformers/all-MiniLM-L6-v2'
)
self.encoder = AutoModel.from_pretrained(
'sentence-transformers/all-MiniLM-L6-v2'
)
def forward(self, task_description):
# Tokenize
inputs = self.tokenizer(
task_description,
padding=True,
truncation=True,
return_tensors='pt'
)
# Encode
with torch.no_grad():
outputs = self.encoder(**inputs)
# Mean pooling
embeddings = outputs.last_hidden_state.mean(dim=1)
return embeddings # Shape: (batch_size, 384)
Example:
task = "Fix authentication bug in login system"
embedding = embedder(task)
# Shape: (1, 384)
# Vector representation of the task
Routing Network
Embedding → Master confidence scores:
class NeuralRouter(nn.Module):
def __init__(self, embedding_dim=384, num_masters=5):
super().__init__()
self.network = nn.Sequential(
# Input: 384-dim embedding
nn.Linear(embedding_dim, 256),
nn.ReLU(),
nn.Dropout(0.2),
# Hidden layer
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(0.2),
# Output: confidence for each master
nn.Linear(128, num_masters),
nn.Softmax(dim=1)
)
def forward(self, task_embedding):
# Returns confidence distribution
return self.network(task_embedding)
Example:
router = NeuralRouter()
confidence = router(embedding)
# Output: [0.72, 0.15, 0.08, 0.03, 0.02]
# [dev, sec, cicd, inv, coord]
Neural Network Flow
The neural network architecture processes task embeddings through multiple layers:
graph TD
A[Task Description] --> B[Tokenizer]
B --> C[Sentence Transformer<br/>all-MiniLM-L6-v2]
C --> D[384-dim Embedding]
D --> E[Linear Layer<br/>384 → 256]
E --> F[ReLU + Dropout]
F --> G[Linear Layer<br/>256 → 128]
G --> H[ReLU + Dropout]
H --> I[Linear Layer<br/>128 → 5]
I --> J[Softmax]
J --> K[Master Confidence<br/>Scores]
style A fill:#30363d,stroke:#58a6ff,stroke-width:2px
style D fill:#30363d,stroke:#00d084,stroke-width:2px
style K fill:#30363d,stroke:#00d084,stroke-width:2px
style B fill:#30363d,stroke:#58a6ff,stroke-width:2px
style C fill:#30363d,stroke:#58a6ff,stroke-width:2px
style E fill:#30363d,stroke:#58a6ff,stroke-width:2px
style F fill:#30363d,stroke:#58a6ff,stroke-width:2px
style G fill:#30363d,stroke:#58a6ff,stroke-width:2px
style H fill:#30363d,stroke:#58a6ff,stroke-width:2px
style I fill:#30363d,stroke:#58a6ff,stroke-width:2px
style J fill:#30363d,stroke:#58a6ff,stroke-width:2px
Complete Pipeline
class HybridRouter:
def __init__(self):
self.embedder = TaskEmbedder()
self.neural_router = NeuralRouter()
self.pattern_router = PatternRouter()
def route(self, task_description):
# Neural routing (30%)
embedding = self.embedder(task_description)
neural_scores = self.neural_router(embedding)
# Pattern routing (60%)
pattern_scores = self.pattern_router.route(task_description)
# Domain routing (10%)
domain_scores = self.domain_router.route(task_description)
# Combine
final_scores = (
neural_scores * 0.3 +
pattern_scores * 0.6 +
domain_scores * 0.1
)
# Select best master
master_idx = torch.argmax(final_scores)
confidence = final_scores[master_idx].item()
return {
'master': MASTERS[master_idx],
'confidence': confidence,
'neural_contribution': neural_scores[master_idx].item(),
'pattern_contribution': pattern_scores[master_idx].item()
}
Training Pipeline
The training pipeline is a continuous cycle that improves routing accuracy over time:
graph TD
A[Production<br/>Routing Decisions] --> B[Data Collection]
B --> C[Filter Successful<br/>Routes Only]
C --> D[Dataset<br/>Preparation]
D --> E[Train/Val Split]
E --> F[Training Loop<br/>10 Epochs]
F --> G[Validation]
G --> H{Accuracy > 90%?}
H -->|Yes| I[Model Export]
H -->|No| J[Adjust<br/>Hyperparameters]
J --> F
I --> K[Quantization]
K --> L[Production<br/>Deployment]
L --> M[A/B Testing]
M --> N{Performance<br/>Improved?}
N -->|Yes| O[Promote to<br/>Champion]
N -->|No| P[Rollback]
O --> A
P --> A
style A fill:#30363d,stroke:#58a6ff,stroke-width:2px
style B fill:#30363d,stroke:#58a6ff,stroke-width:2px
style C fill:#30363d,stroke:#58a6ff,stroke-width:2px
style D fill:#30363d,stroke:#58a6ff,stroke-width:2px
style E fill:#30363d,stroke:#58a6ff,stroke-width:2px
style F fill:#30363d,stroke:#f85149,stroke-width:2px
style G fill:#30363d,stroke:#58a6ff,stroke-width:2px
style H fill:#30363d,stroke:#f85149,stroke-width:2px
style I fill:#30363d,stroke:#00d084,stroke-width:2px
style J fill:#30363d,stroke:#f85149,stroke-width:2px
style K fill:#30363d,stroke:#58a6ff,stroke-width:2px
style L fill:#30363d,stroke:#58a6ff,stroke-width:2px
style M fill:#30363d,stroke:#f85149,stroke-width:2px
style N fill:#30363d,stroke:#f85149,stroke-width:2px
style O fill:#30363d,stroke:#00d084,stroke-width:2px
style P fill:#30363d,stroke:#f85149,stroke-width:2px
Data Collection
Collect from production routing history:
# coordination/knowledge-base/routing-decisions.jsonl
{"task": "Fix auth bug", "master": "development", "success": true, "quality": 0.92}
{"task": "Scan CVE-2024-001", "master": "security", "success": true, "quality": 0.95}
{"task": "Deploy to prod", "master": "cicd", "success": true, "quality": 0.88}
...
Dataset Preparation
import pandas as pd
from torch.utils.data import Dataset, DataLoader
class RoutingDataset(Dataset):
def __init__(self, jsonl_file):
self.data = pd.read_json(jsonl_file, lines=True)
# Filter successful routes only
self.data = self.data[self.data['success'] == True]
# Map masters to indices
self.master_to_idx = {
'development': 0,
'security': 1,
'cicd': 2,
'inventory': 3,
'coordinator': 4
}
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
row = self.data.iloc[idx]
task = row['task']
master_idx = self.master_to_idx[row['master']]
quality = row['quality']
return {
'task': task,
'master': master_idx,
'quality': quality
}
# Create dataloaders
dataset = RoutingDataset('coordination/knowledge-base/routing-decisions.jsonl')
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)
Training Loop
def train_router(model, embedder, train_loader, epochs=10):
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()
model.train()
for epoch in range(epochs):
total_loss = 0
for batch in train_loader:
# Get embeddings
embeddings = embedder(batch['task'])
# Forward pass
predictions = model(embeddings)
# Calculate loss
loss = criterion(predictions, batch['master'])
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
print(f'Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}')
return model
Training Results
Epoch 1/10, Loss: 0.8234
Epoch 2/10, Loss: 0.6112
Epoch 3/10, Loss: 0.4521
Epoch 4/10, Loss: 0.3298
Epoch 5/10, Loss: 0.2445
Epoch 6/10, Loss: 0.1923
Epoch 7/10, Loss: 0.1589
Epoch 8/10, Loss: 0.1367
Epoch 9/10, Loss: 0.1198
Epoch 10/10, Loss: 0.1087
Final training accuracy: 94.2%
Evaluation
Validation Set Performance
def evaluate_router(model, embedder, val_loader):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch in val_loader:
embeddings = embedder(batch['task'])
predictions = model(embeddings)
predicted = torch.argmax(predictions, dim=1)
correct += (predicted == batch['master']).sum().item()
total += len(batch['master'])
accuracy = correct / total
return accuracy
val_accuracy = evaluate_router(model, embedder, val_loader)
print(f'Validation Accuracy: {val_accuracy:.2%}')
# Output: Validation Accuracy: 91.7%
Confusion Matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
# Generate predictions
y_true = []
y_pred = []
for batch in val_loader:
embeddings = embedder(batch['task'])
predictions = model(embeddings)
predicted = torch.argmax(predictions, dim=1)
y_true.extend(batch['master'].tolist())
y_pred.extend(predicted.tolist())
# Create confusion matrix
cm = confusion_matrix(y_true, y_pred)
# Visualize
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d',
xticklabels=MASTERS,
yticklabels=MASTERS)
plt.ylabel('True Master')
plt.xlabel('Predicted Master')
plt.title('Routing Confusion Matrix')
plt.savefig('confusion_matrix.png')
Results:
dev sec cicd inv coord
development 234 12 3 0 1
security 8 156 2 1 0
cicd 4 1 98 0 0
inventory 0 0 0 45 2
coordinator 1 0 0 1 23
Interpretation: 91.7% accurate, most errors are dev ↔ security confusion (expected).
A/B Testing
The A/B testing process follows a three-stage deployment pattern for safe rollout:
graph TD
A[New Model v2.0] --> B{Shadow Mode}
B -->|All Traffic| C[Pattern Router<br/>Production]
B -->|Copy Traffic| D[Neural Router<br/>Log Only]
D --> E{Compare Results}
E -->|Pass| F{Canary Mode}
E -->|Fail| Z[Iterate]
F -->|90% Traffic| C
F -->|10% Traffic| G[Neural Router<br/>Production]
G --> H{Statistical<br/>Validation}
H -->|Pass| I{Full Rollout}
H -->|Fail| Z
I -->|100% Traffic| J[Neural Router<br/>Champion]
I --> K[Pattern Router<br/>Deprecated]
style A fill:#30363d,stroke:#58a6ff,stroke-width:2px
style B fill:#30363d,stroke:#f85149,stroke-width:2px
style C fill:#30363d,stroke:#00d084,stroke-width:2px
style D fill:#30363d,stroke:#58a6ff,stroke-width:2px
style E fill:#30363d,stroke:#f85149,stroke-width:2px
style F fill:#30363d,stroke:#f85149,stroke-width:2px
style G fill:#30363d,stroke:#58a6ff,stroke-width:2px
style H fill:#30363d,stroke:#f85149,stroke-width:2px
style I fill:#30363d,stroke:#f85149,stroke-width:2px
style J fill:#30363d,stroke:#00d084,stroke-width:2px
style K fill:#30363d,stroke:#8b949e,stroke-width:2px
style Z fill:#30363d,stroke:#f85149,stroke-width:2px
Shadow Mode
Deploy neural router alongside pattern router:
def route_with_shadow(task):
# Pattern router (production)
pattern_result = pattern_router.route(task)
# Neural router (shadow - not used for decisions)
neural_result = neural_router.route(task)
# Log both for comparison
log_shadow_comparison(task, pattern_result, neural_result)
# Use pattern result
return pattern_result
Canary Deployment
Gradually increase neural router usage:
import random
def route_with_canary(task, canary_percent=10):
# 10% use neural router
if random.random() < (canary_percent / 100):
result = hybrid_router.route(task)
result['router_type'] = 'hybrid'
else:
result = pattern_router.route(task)
result['router_type'] = 'pattern'
return result
Canary Schedule:
Week 1: 5% hybrid
Week 2: 10% hybrid
Week 3: 25% hybrid
Week 4: 50% hybrid
Week 5: 75% hybrid
Week 6: 100% hybrid (full rollout)
Performance Comparison
{
"pattern_only": {
"accuracy": 0.87,
"avg_confidence": 0.79,
"latency_ms": 12
},
"hybrid": {
"accuracy": 0.92, // +5.7%
"avg_confidence": 0.84, // +6.3%
"latency_ms": 45 // +33ms (acceptable)
}
}
Decision: Hybrid router improves accuracy significantly, worth the latency cost.
Production Deployment
Model Serving
from fastapi import FastAPI
import uvicorn
app = FastAPI()
# Load model once at startup
embedder = TaskEmbedder()
neural_router = torch.load('models/neural_router_v1.pt')
neural_router.eval()
@app.post('/api/route')
async def route_task(task_description: str):
# Embed task
embedding = embedder(task_description)
# Get neural scores
with torch.no_grad():
neural_scores = neural_router(embedding)
# Get pattern scores (from cache or compute)
pattern_scores = pattern_cache.get(task_description)
# Combine
final_scores = neural_scores * 0.3 + pattern_scores * 0.6
master_idx = torch.argmax(final_scores).item()
return {
'master': MASTERS[master_idx],
'confidence': final_scores[master_idx].item(),
'scores': final_scores.tolist()
}
if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=8000)
Optimization for Production
1. Model Quantization
# Reduce model size and increase inference speed
quantized_model = torch.quantization.quantize_dynamic(
neural_router,
{nn.Linear},
dtype=torch.qint8
)
# Save quantized model
torch.save(quantized_model, 'models/neural_router_quantized.pt')
# Result: 75% smaller, 2x faster inference
2. Batch Processing
async def route_batch(tasks: List[str]):
# Batch embed (faster than one-by-one)
embeddings = embedder(tasks)
# Batch inference
with torch.no_grad():
scores = neural_router(embeddings)
return scores
3. Caching
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_embedding(task_description):
return embedder(task_description)
# Repeated tasks use cached embeddings
Continuous Learning
Retraining Schedule
# Retrain every Sunday at 2 AM
import schedule
def retrain_router():
# Load new routing data from past week
dataset = RoutingDataset('coordination/knowledge-base/routing-decisions.jsonl')
# Filter recent data (last 7 days)
recent_data = dataset.filter_by_date(days=7)
if len(recent_data) > 100: # Only retrain if enough new data
# Train new model
new_model = train_router(model, embedder, recent_data, epochs=5)
# Evaluate on validation set
accuracy = evaluate_router(new_model, embedder, val_loader)
if accuracy > current_accuracy:
# Deploy new model
torch.save(new_model, f'models/neural_router_v{version+1}.pt')
print(f'Deployed new model v{version+1}: accuracy {accuracy:.2%}')
schedule.every().sunday.at('02:00').do(retrain_router)
Model Versioning
models/
├── neural_router_v1.pt # Initial model (91.7% accuracy)
├── neural_router_v2.pt # Week 1 retrain (92.3% accuracy)
├── neural_router_v3.pt # Week 2 retrain (92.8% accuracy)
├── neural_router_v4.pt # Week 3 retrain (93.1% accuracy)
└── neural_router_latest.pt → neural_router_v4.pt
Monitoring
Track Neural Router Performance
{
"neural_router_metrics": {
"inference_latency_ms": {
"p50": 38,
"p95": 52,
"p99": 67
},
"accuracy": {
"real_time": 0.923,
"7_day_avg": 0.918
},
"confidence": {
"avg": 0.84,
"high_confidence_rate": 0.73 // > 0.8
},
"fallback_rate": 0.08 // Used pattern router fallback
}
}
Alerts
if metrics['accuracy']['real_time'] < 0.85:
alert('Neural router accuracy dropped below 85%')
if metrics['inference_latency_ms']['p95'] > 100:
alert('Neural router latency above 100ms')
if metrics['fallback_rate'] > 0.15:
alert('High fallback rate - neural router may be struggling')
Key Takeaways
- Hybrid routing combines interpretability + learning
- PyTorch neural nets learn complex patterns
- Training from production data improves over time
- A/B testing validates neural routing works
- Continuous retraining keeps models current
Neural routing isn’t about replacing pattern matching - it’s about augmenting it with learned intelligence.
Learn More About Cortex
Want to dive deeper into how Cortex works? Visit the Meet Cortex page to learn about its architecture, capabilities, and how it scales from 1 to 100+ agents on-demand.
Part 10 of the Cortex series - Week 2 complete! Next week: Development journey deep dives.
Monday: Week 1: The First 100 Commits