Building a Coordinator Master from Scratch
Building a Coordinator Master from Scratch
Today, we’re going hands-on. I’ll show you how to build a coordinator master - the brain of an MoE system.
By the end, you’ll have a working implementation.
Architecture Overview
A coordinator master needs:
- Task Receiver - Accept incoming tasks
- Router - Decide which specialist to use
- Worker Spawner - Create execution workers
- Progress Tracker - Monitor task status
- Learning System - Improve over time
Let’s build each component.
Project Setup
mkdir moe-coordinator
cd moe-coordinator
npm init -y
npm install express uuid
Project Structure:
moe-coordinator/
├── coordinator.js # Main coordinator logic
├── router.js # Task routing algorithm
├── worker-manager.js # Worker lifecycle
├── patterns.js # Pattern learning
├── server.js # HTTP API
└── coordination/
├── tasks.jsonl # Task queue
├── workers.jsonl # Active workers
└── patterns.jsonl # Learned patterns
Step 1: Task Receiver
server.js - HTTP API for task submission:
const express = require('express');
const { Coordinator } = require('./coordinator');
const app = express();
app.use(express.json());
const coordinator = new Coordinator();
// Submit task endpoint
app.post('/api/tasks', async (req, res) => {
const { description, priority = 'medium' } = req.body;
const task = await coordinator.submitTask({
description,
priority,
submitted_at: new Date()
});
res.json({
task_id: task.id,
status: 'queued',
message: 'Task submitted successfully'
});
});
// Get task status endpoint
app.get('/api/tasks/:taskId', async (req, res) => {
const task = await coordinator.getTaskStatus(req.params.taskId);
res.json(task);
});
app.listen(3000, () => {
console.log('Coordinator listening on port 3000');
});
Step 2: Core Coordinator
coordinator.js - Main orchestration logic:
const { Router } = require('./router');
const { WorkerManager } = require('./worker-manager');
const { PatternLearner } = require('./patterns');
const { v4: uuidv4 } = require('uuid');
const fs = require('fs').promises;
class Coordinator {
constructor() {
this.router = new Router();
this.workerManager = new WorkerManager();
this.patternLearner = new PatternLearner();
this.tasks = new Map();
}
async submitTask(taskData) {
const task = {
id: uuidv4(),
...taskData,
status: 'queued',
submitted_at: new Date()
};
// Store task
this.tasks.set(task.id, task);
await this.appendToLog('coordination/tasks.jsonl', task);
// Start processing asynchronously
setImmediate(() => this.processTask(task.id));
return task;
}
async processTask(taskId) {
const task = this.tasks.get(taskId);
if (!task) return;
try {
// Step 1: Route to appropriate master
const routing = await this.router.route(task);
task.routing = routing;
task.status = 'routing_complete';
this.tasks.set(taskId, task);
// Step 2: Spawn worker
const worker = await this.workerManager.spawnWorker({
taskId: task.id,
master: routing.selected_master,
priority: task.priority
});
task.worker_id = worker.id;
task.status = 'in_progress';
this.tasks.set(taskId, task);
// Step 3: Wait for completion
const result = await this.workerManager.waitForCompletion(worker.id);
task.result = result;
task.status = result.success ? 'completed' : 'failed';
task.completed_at = new Date();
this.tasks.set(taskId, task);
// Step 4: Learn from outcome
await this.patternLearner.learn(task, routing, result);
} catch (error) {
task.status = 'error';
task.error = error.message;
this.tasks.set(taskId, task);
}
}
async getTaskStatus(taskId) {
return this.tasks.get(taskId) || { error: 'Task not found' };
}
async appendToLog(filepath, data) {
await fs.appendFile(filepath, JSON.stringify(data) + '\n');
}
}
module.exports = { Coordinator };
Step 3: Task Router
router.js - Routing algorithm:
const fs = require('fs').promises;
class Router {
constructor() {
this.masters = [
{ id: 'development-master', domain: 'development', keywords: ['implement', 'fix', 'refactor', 'bug'] },
{ id: 'security-master', domain: 'security', keywords: ['scan', 'CVE', 'vulnerability', 'audit'] },
{ id: 'cicd-master', domain: 'cicd', keywords: ['deploy', 'build', 'test', 'release'] }
];
this.patterns = [];
}
async route(task) {
// Load learned patterns
await this.loadPatterns();
// Calculate confidence for each master
const confidenceScores = this.masters.map(master => ({
master: master.id,
confidence: this.calculateConfidence(task, master)
}));
// Sort by confidence
confidenceScores.sort((a, b) => b.confidence - a.confidence);
const selected = confidenceScores[0];
return {
task_id: task.id,
selected_master: selected.master,
confidence: selected.confidence,
alternatives: confidenceScores.slice(1),
timestamp: new Date()
};
}
calculateConfidence(task, master) {
let confidence = 0.5; // Base confidence
// Keyword matching
const taskKeywords = this.extractKeywords(task.description);
const keywordMatches = taskKeywords.filter(kw =>
master.keywords.includes(kw)
).length;
confidence += (keywordMatches / taskKeywords.length) * 0.3;
// Pattern matching
const matchingPatterns = this.patterns.filter(p =>
p.master === master.id &&
this.similarity(p.keywords, taskKeywords) > 0.7
);
if (matchingPatterns.length > 0) {
const avgSuccessRate = matchingPatterns.reduce((sum, p) =>
sum + p.success_rate, 0) / matchingPatterns.length;
confidence += avgSuccessRate * 0.2;
}
return Math.min(1.0, confidence);
}
extractKeywords(description) {
return description
.toLowerCase()
.split(/\s+/)
.filter(word => word.length > 3);
}
similarity(arr1, arr2) {
const set1 = new Set(arr1);
const set2 = new Set(arr2);
const intersection = [...set1].filter(x => set2.has(x));
const union = new Set([...set1, ...set2]);
return intersection.length / union.size;
}
async loadPatterns() {
try {
const data = await fs.readFile('coordination/patterns.jsonl', 'utf8');
this.patterns = data
.split('\n')
.filter(line => line.trim())
.map(line => JSON.parse(line));
} catch (error) {
this.patterns = [];
}
}
}
module.exports = { Router };
Step 4: Worker Manager
worker-manager.js - Worker lifecycle:
const { spawn } = require('child_process');
const { v4: uuidv4 } = require('uuid');
const fs = require('fs').promises;
class WorkerManager {
constructor() {
this.workers = new Map();
}
async spawnWorker({ taskId, master, priority }) {
const worker = {
id: uuidv4(),
task_id: taskId,
master: master,
priority: priority,
status: 'spawning',
spawned_at: new Date()
};
// Log worker creation
await this.appendToLog('coordination/workers.jsonl', {
event: 'worker_spawned',
...worker
});
// Simulate spawning a worker process
// In production, this would spawn actual worker processes
worker.status = 'running';
worker.started_at = new Date();
this.workers.set(worker.id, worker);
return worker;
}
async waitForCompletion(workerId) {
const worker = this.workers.get(workerId);
if (!worker) {
throw new Error('Worker not found');
}
// Simulate work execution
// In production, monitor actual worker process
return new Promise((resolve) => {
setTimeout(() => {
const result = {
success: true,
quality_score: 0.85 + Math.random() * 0.15,
duration_minutes: 5 + Math.floor(Math.random() * 20),
completed_at: new Date()
};
worker.status = 'completed';
worker.completed_at = new Date();
this.workers.set(workerId, worker);
this.appendToLog('coordination/workers.jsonl', {
event: 'worker_completed',
worker_id: workerId,
...result
});
resolve(result);
}, 2000); // Simulate 2 second execution
});
}
async appendToLog(filepath, data) {
await fs.appendFile(filepath, JSON.stringify(data) + '\n');
}
}
module.exports = { WorkerManager };
Step 5: Pattern Learning
patterns.js - Learning system:
const fs = require('fs').promises;
class PatternLearner {
async learn(task, routing, result) {
const pattern = {
keywords: this.extractKeywords(task.description),
category: this.categorizeTask(task.description),
master: routing.selected_master,
predicted_confidence: routing.confidence,
success: result.success,
quality: result.quality_score,
duration: result.duration_minutes,
timestamp: new Date()
};
// Store pattern
await this.appendToLog('coordination/patterns.jsonl', pattern);
// Update master performance metrics
await this.updateMasterMetrics(routing.selected_master, result);
}
extractKeywords(description) {
return description
.toLowerCase()
.split(/\s+/)
.filter(word => word.length > 3);
}
categorizeTask(description) {
const lower = description.toLowerCase();
if (lower.includes('fix') || lower.includes('bug')) {
return 'bugfix';
}
if (lower.includes('implement') || lower.includes('add')) {
return 'feature';
}
if (lower.includes('scan') || lower.includes('security')) {
return 'security';
}
if (lower.includes('deploy') || lower.includes('release')) {
return 'deployment';
}
return 'other';
}
async updateMasterMetrics(masterId, result) {
// Load current metrics
let metrics = {};
try {
const data = await fs.readFile('coordination/master-metrics.json', 'utf8');
metrics = JSON.parse(data);
} catch (error) {
metrics = {};
}
// Initialize master if needed
if (!metrics[masterId]) {
metrics[masterId] = {
total_tasks: 0,
successful_tasks: 0,
total_quality: 0,
total_duration: 0
};
}
// Update metrics
metrics[masterId].total_tasks += 1;
if (result.success) {
metrics[masterId].successful_tasks += 1;
}
metrics[masterId].total_quality += result.quality_score;
metrics[masterId].total_duration += result.duration_minutes;
// Calculate derived metrics
metrics[masterId].success_rate =
metrics[masterId].successful_tasks / metrics[masterId].total_tasks;
metrics[masterId].avg_quality =
metrics[masterId].total_quality / metrics[masterId].total_tasks;
metrics[masterId].avg_duration =
metrics[masterId].total_duration / metrics[masterId].total_tasks;
// Save updated metrics
await fs.writeFile(
'coordination/master-metrics.json',
JSON.stringify(metrics, null, 2)
);
}
async appendToLog(filepath, data) {
await fs.appendFile(filepath, JSON.stringify(data) + '\n');
}
}
module.exports = { PatternLearner };
Step 6: Initialize Directories
mkdir -p coordination
touch coordination/tasks.jsonl
touch coordination/workers.jsonl
touch coordination/patterns.jsonl
Running the Coordinator
Start the server:
node server.js
Submit a task:
curl -X POST http://localhost:3000/api/tasks \
-H "Content-Type: application/json" \
-d '{"description": "Fix authentication bug in login system", "priority": "high"}'
Check task status:
curl http://localhost:3000/api/tasks/TASK_ID
Example Output
Task submission response:
{
"task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "queued",
"message": "Task submitted successfully"
}
Task status after completion:
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"description": "Fix authentication bug in login system",
"priority": "high",
"status": "completed",
"routing": {
"selected_master": "development-master",
"confidence": 0.82,
"alternatives": [
{"master": "security-master", "confidence": 0.64},
{"master": "cicd-master", "confidence": 0.21}
]
},
"worker_id": "worker-123",
"result": {
"success": true,
"quality_score": 0.91,
"duration_minutes": 18
},
"submitted_at": "2025-12-11T10:00:00Z",
"completed_at": "2025-12-11T10:18:00Z"
}
Extending the Coordinator
Add More Masters
const masters = [
{ id: 'development-master', domain: 'development', keywords: [...] },
{ id: 'security-master', domain: 'security', keywords: [...] },
{ id: 'cicd-master', domain: 'cicd', keywords: [...] },
{ id: 'docs-master', domain: 'documentation', keywords: ['document', 'readme', 'guide'] }
];
Add Neural Routing
const { NeuralRouter } = require('./neural-router');
class Router {
constructor() {
this.patternRouter = new PatternRouter();
this.neuralRouter = new NeuralRouter();
}
async route(task) {
const patternScore = await this.patternRouter.route(task);
const neuralScore = await this.neuralRouter.route(task);
// Hybrid: 60% patterns, 40% neural
return this.combineScores(patternScore, neuralScore, 0.6, 0.4);
}
}
Add Health Monitoring
class Coordinator {
constructor() {
// Start health check daemon
setInterval(() => this.checkHealth(), 60000); // Every minute
}
async checkHealth() {
for (const [workerId, worker] of this.workers) {
const runningTime = Date.now() - worker.started_at;
// Timeout after 30 minutes
if (runningTime > 30 * 60 * 1000 && worker.status === 'running') {
await this.terminateWorker(workerId);
console.warn(`Worker ${workerId} timed out`);
}
}
}
}
Production Considerations
Error Handling
async processTask(taskId) {
try {
// ... existing code ...
} catch (error) {
console.error(`Task ${taskId} failed:`, error);
// Retry logic
if (task.retries < 3) {
task.retries = (task.retries || 0) + 1;
setTimeout(() => this.processTask(taskId), 5000 * task.retries);
} else {
task.status = 'failed';
task.error = error.message;
}
}
}
Rate Limiting
class Coordinator {
constructor() {
this.maxConcurrentWorkers = 20;
}
async spawnWorker(...args) {
// Wait if at capacity
while (this.getActiveWorkerCount() >= this.maxConcurrentWorkers) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
return this.workerManager.spawnWorker(...args);
}
}
Observability
const { performance } = require('perf_hooks');
class Coordinator {
async processTask(taskId) {
const start = performance.now();
try {
// ... process task ...
const duration = performance.now() - start;
this.recordMetric('task_duration', duration);
} catch (error) {
this.recordMetric('task_failures', 1);
}
}
}
Key Takeaways
- Coordinator orchestrates - Routes, spawns, monitors
- Router decides - Pattern + confidence scoring
- Worker manager executes - Lifecycle management
- Learning happens - Every task improves routing
- JSONL for state - Simple, debuggable, reliable
You now have a working MoE coordinator! Tomorrow, we’ll explore how Cortex used itself to transform from commit-relay to Cortex - meta-programming in action.
Learn More About Cortex
Want to dive deeper into how Cortex works? Visit the Meet Cortex page to learn about its architecture, capabilities, and how it scales from 1 to 100+ agents on-demand.
Part 8 of the Cortex series. Next: Meta-Programming: Using Cortex to Build Cortex