Skip to main content

Building a Coordinator Master from Scratch

Ryan Dahlberg
Ryan Dahlberg
December 15, 2025 9 min read
Share:
Building a Coordinator Master from Scratch

Building a Coordinator Master from Scratch

Today, we’re going hands-on. I’ll show you how to build a coordinator master - the brain of an MoE system.

By the end, you’ll have a working implementation.

Architecture Overview

A coordinator master needs:

  1. Task Receiver - Accept incoming tasks
  2. Router - Decide which specialist to use
  3. Worker Spawner - Create execution workers
  4. Progress Tracker - Monitor task status
  5. Learning System - Improve over time

Let’s build each component.

Project Setup

mkdir moe-coordinator
cd moe-coordinator
npm init -y
npm install express uuid

Project Structure:

moe-coordinator/
├── coordinator.js          # Main coordinator logic
├── router.js              # Task routing algorithm
├── worker-manager.js      # Worker lifecycle
├── patterns.js            # Pattern learning
├── server.js              # HTTP API
└── coordination/
    ├── tasks.jsonl        # Task queue
    ├── workers.jsonl      # Active workers
    └── patterns.jsonl     # Learned patterns

Step 1: Task Receiver

server.js - HTTP API for task submission:

const express = require('express');
const { Coordinator } = require('./coordinator');

const app = express();
app.use(express.json());

const coordinator = new Coordinator();

// Submit task endpoint
app.post('/api/tasks', async (req, res) => {
  const { description, priority = 'medium' } = req.body;

  const task = await coordinator.submitTask({
    description,
    priority,
    submitted_at: new Date()
  });

  res.json({
    task_id: task.id,
    status: 'queued',
    message: 'Task submitted successfully'
  });
});

// Get task status endpoint
app.get('/api/tasks/:taskId', async (req, res) => {
  const task = await coordinator.getTaskStatus(req.params.taskId);
  res.json(task);
});

app.listen(3000, () => {
  console.log('Coordinator listening on port 3000');
});

Step 2: Core Coordinator

coordinator.js - Main orchestration logic:

const { Router } = require('./router');
const { WorkerManager } = require('./worker-manager');
const { PatternLearner } = require('./patterns');
const { v4: uuidv4 } = require('uuid');
const fs = require('fs').promises;

class Coordinator {
  constructor() {
    this.router = new Router();
    this.workerManager = new WorkerManager();
    this.patternLearner = new PatternLearner();
    this.tasks = new Map();
  }

  async submitTask(taskData) {
    const task = {
      id: uuidv4(),
      ...taskData,
      status: 'queued',
      submitted_at: new Date()
    };

    // Store task
    this.tasks.set(task.id, task);
    await this.appendToLog('coordination/tasks.jsonl', task);

    // Start processing asynchronously
    setImmediate(() => this.processTask(task.id));

    return task;
  }

  async processTask(taskId) {
    const task = this.tasks.get(taskId);
    if (!task) return;

    try {
      // Step 1: Route to appropriate master
      const routing = await this.router.route(task);

      task.routing = routing;
      task.status = 'routing_complete';
      this.tasks.set(taskId, task);

      // Step 2: Spawn worker
      const worker = await this.workerManager.spawnWorker({
        taskId: task.id,
        master: routing.selected_master,
        priority: task.priority
      });

      task.worker_id = worker.id;
      task.status = 'in_progress';
      this.tasks.set(taskId, task);

      // Step 3: Wait for completion
      const result = await this.workerManager.waitForCompletion(worker.id);

      task.result = result;
      task.status = result.success ? 'completed' : 'failed';
      task.completed_at = new Date();
      this.tasks.set(taskId, task);

      // Step 4: Learn from outcome
      await this.patternLearner.learn(task, routing, result);

    } catch (error) {
      task.status = 'error';
      task.error = error.message;
      this.tasks.set(taskId, task);
    }
  }

  async getTaskStatus(taskId) {
    return this.tasks.get(taskId) || { error: 'Task not found' };
  }

  async appendToLog(filepath, data) {
    await fs.appendFile(filepath, JSON.stringify(data) + '\n');
  }
}

module.exports = { Coordinator };

Step 3: Task Router

router.js - Routing algorithm:

const fs = require('fs').promises;

class Router {
  constructor() {
    this.masters = [
      { id: 'development-master', domain: 'development', keywords: ['implement', 'fix', 'refactor', 'bug'] },
      { id: 'security-master', domain: 'security', keywords: ['scan', 'CVE', 'vulnerability', 'audit'] },
      { id: 'cicd-master', domain: 'cicd', keywords: ['deploy', 'build', 'test', 'release'] }
    ];
    this.patterns = [];
  }

  async route(task) {
    // Load learned patterns
    await this.loadPatterns();

    // Calculate confidence for each master
    const confidenceScores = this.masters.map(master => ({
      master: master.id,
      confidence: this.calculateConfidence(task, master)
    }));

    // Sort by confidence
    confidenceScores.sort((a, b) => b.confidence - a.confidence);

    const selected = confidenceScores[0];

    return {
      task_id: task.id,
      selected_master: selected.master,
      confidence: selected.confidence,
      alternatives: confidenceScores.slice(1),
      timestamp: new Date()
    };
  }

  calculateConfidence(task, master) {
    let confidence = 0.5; // Base confidence

    // Keyword matching
    const taskKeywords = this.extractKeywords(task.description);
    const keywordMatches = taskKeywords.filter(kw =>
      master.keywords.includes(kw)
    ).length;

    confidence += (keywordMatches / taskKeywords.length) * 0.3;

    // Pattern matching
    const matchingPatterns = this.patterns.filter(p =>
      p.master === master.id &&
      this.similarity(p.keywords, taskKeywords) > 0.7
    );

    if (matchingPatterns.length > 0) {
      const avgSuccessRate = matchingPatterns.reduce((sum, p) =>
        sum + p.success_rate, 0) / matchingPatterns.length;

      confidence += avgSuccessRate * 0.2;
    }

    return Math.min(1.0, confidence);
  }

  extractKeywords(description) {
    return description
      .toLowerCase()
      .split(/\s+/)
      .filter(word => word.length > 3);
  }

  similarity(arr1, arr2) {
    const set1 = new Set(arr1);
    const set2 = new Set(arr2);
    const intersection = [...set1].filter(x => set2.has(x));
    const union = new Set([...set1, ...set2]);
    return intersection.length / union.size;
  }

  async loadPatterns() {
    try {
      const data = await fs.readFile('coordination/patterns.jsonl', 'utf8');
      this.patterns = data
        .split('\n')
        .filter(line => line.trim())
        .map(line => JSON.parse(line));
    } catch (error) {
      this.patterns = [];
    }
  }
}

module.exports = { Router };

Step 4: Worker Manager

worker-manager.js - Worker lifecycle:

const { spawn } = require('child_process');
const { v4: uuidv4 } = require('uuid');
const fs = require('fs').promises;

class WorkerManager {
  constructor() {
    this.workers = new Map();
  }

  async spawnWorker({ taskId, master, priority }) {
    const worker = {
      id: uuidv4(),
      task_id: taskId,
      master: master,
      priority: priority,
      status: 'spawning',
      spawned_at: new Date()
    };

    // Log worker creation
    await this.appendToLog('coordination/workers.jsonl', {
      event: 'worker_spawned',
      ...worker
    });

    // Simulate spawning a worker process
    // In production, this would spawn actual worker processes
    worker.status = 'running';
    worker.started_at = new Date();

    this.workers.set(worker.id, worker);

    return worker;
  }

  async waitForCompletion(workerId) {
    const worker = this.workers.get(workerId);
    if (!worker) {
      throw new Error('Worker not found');
    }

    // Simulate work execution
    // In production, monitor actual worker process
    return new Promise((resolve) => {
      setTimeout(() => {
        const result = {
          success: true,
          quality_score: 0.85 + Math.random() * 0.15,
          duration_minutes: 5 + Math.floor(Math.random() * 20),
          completed_at: new Date()
        };

        worker.status = 'completed';
        worker.completed_at = new Date();
        this.workers.set(workerId, worker);

        this.appendToLog('coordination/workers.jsonl', {
          event: 'worker_completed',
          worker_id: workerId,
          ...result
        });

        resolve(result);
      }, 2000); // Simulate 2 second execution
    });
  }

  async appendToLog(filepath, data) {
    await fs.appendFile(filepath, JSON.stringify(data) + '\n');
  }
}

module.exports = { WorkerManager };

Step 5: Pattern Learning

patterns.js - Learning system:

const fs = require('fs').promises;

class PatternLearner {
  async learn(task, routing, result) {
    const pattern = {
      keywords: this.extractKeywords(task.description),
      category: this.categorizeTask(task.description),
      master: routing.selected_master,
      predicted_confidence: routing.confidence,
      success: result.success,
      quality: result.quality_score,
      duration: result.duration_minutes,
      timestamp: new Date()
    };

    // Store pattern
    await this.appendToLog('coordination/patterns.jsonl', pattern);

    // Update master performance metrics
    await this.updateMasterMetrics(routing.selected_master, result);
  }

  extractKeywords(description) {
    return description
      .toLowerCase()
      .split(/\s+/)
      .filter(word => word.length > 3);
  }

  categorizeTask(description) {
    const lower = description.toLowerCase();

    if (lower.includes('fix') || lower.includes('bug')) {
      return 'bugfix';
    }
    if (lower.includes('implement') || lower.includes('add')) {
      return 'feature';
    }
    if (lower.includes('scan') || lower.includes('security')) {
      return 'security';
    }
    if (lower.includes('deploy') || lower.includes('release')) {
      return 'deployment';
    }

    return 'other';
  }

  async updateMasterMetrics(masterId, result) {
    // Load current metrics
    let metrics = {};
    try {
      const data = await fs.readFile('coordination/master-metrics.json', 'utf8');
      metrics = JSON.parse(data);
    } catch (error) {
      metrics = {};
    }

    // Initialize master if needed
    if (!metrics[masterId]) {
      metrics[masterId] = {
        total_tasks: 0,
        successful_tasks: 0,
        total_quality: 0,
        total_duration: 0
      };
    }

    // Update metrics
    metrics[masterId].total_tasks += 1;
    if (result.success) {
      metrics[masterId].successful_tasks += 1;
    }
    metrics[masterId].total_quality += result.quality_score;
    metrics[masterId].total_duration += result.duration_minutes;

    // Calculate derived metrics
    metrics[masterId].success_rate =
      metrics[masterId].successful_tasks / metrics[masterId].total_tasks;
    metrics[masterId].avg_quality =
      metrics[masterId].total_quality / metrics[masterId].total_tasks;
    metrics[masterId].avg_duration =
      metrics[masterId].total_duration / metrics[masterId].total_tasks;

    // Save updated metrics
    await fs.writeFile(
      'coordination/master-metrics.json',
      JSON.stringify(metrics, null, 2)
    );
  }

  async appendToLog(filepath, data) {
    await fs.appendFile(filepath, JSON.stringify(data) + '\n');
  }
}

module.exports = { PatternLearner };

Step 6: Initialize Directories

mkdir -p coordination
touch coordination/tasks.jsonl
touch coordination/workers.jsonl
touch coordination/patterns.jsonl

Running the Coordinator

Start the server:

node server.js

Submit a task:

curl -X POST http://localhost:3000/api/tasks \
  -H "Content-Type: application/json" \
  -d '{"description": "Fix authentication bug in login system", "priority": "high"}'

Check task status:

curl http://localhost:3000/api/tasks/TASK_ID

Example Output

Task submission response:

{
  "task_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "status": "queued",
  "message": "Task submitted successfully"
}

Task status after completion:

{
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "description": "Fix authentication bug in login system",
  "priority": "high",
  "status": "completed",
  "routing": {
    "selected_master": "development-master",
    "confidence": 0.82,
    "alternatives": [
      {"master": "security-master", "confidence": 0.64},
      {"master": "cicd-master", "confidence": 0.21}
    ]
  },
  "worker_id": "worker-123",
  "result": {
    "success": true,
    "quality_score": 0.91,
    "duration_minutes": 18
  },
  "submitted_at": "2025-12-11T10:00:00Z",
  "completed_at": "2025-12-11T10:18:00Z"
}

Extending the Coordinator

Add More Masters

const masters = [
  { id: 'development-master', domain: 'development', keywords: [...] },
  { id: 'security-master', domain: 'security', keywords: [...] },
  { id: 'cicd-master', domain: 'cicd', keywords: [...] },
  { id: 'docs-master', domain: 'documentation', keywords: ['document', 'readme', 'guide'] }
];

Add Neural Routing

const { NeuralRouter } = require('./neural-router');

class Router {
  constructor() {
    this.patternRouter = new PatternRouter();
    this.neuralRouter = new NeuralRouter();
  }

  async route(task) {
    const patternScore = await this.patternRouter.route(task);
    const neuralScore = await this.neuralRouter.route(task);

    // Hybrid: 60% patterns, 40% neural
    return this.combineScores(patternScore, neuralScore, 0.6, 0.4);
  }
}

Add Health Monitoring

class Coordinator {
  constructor() {
    // Start health check daemon
    setInterval(() => this.checkHealth(), 60000); // Every minute
  }

  async checkHealth() {
    for (const [workerId, worker] of this.workers) {
      const runningTime = Date.now() - worker.started_at;

      // Timeout after 30 minutes
      if (runningTime > 30 * 60 * 1000 && worker.status === 'running') {
        await this.terminateWorker(workerId);
        console.warn(`Worker ${workerId} timed out`);
      }
    }
  }
}

Production Considerations

Error Handling

async processTask(taskId) {
  try {
    // ... existing code ...
  } catch (error) {
    console.error(`Task ${taskId} failed:`, error);

    // Retry logic
    if (task.retries < 3) {
      task.retries = (task.retries || 0) + 1;
      setTimeout(() => this.processTask(taskId), 5000 * task.retries);
    } else {
      task.status = 'failed';
      task.error = error.message;
    }
  }
}

Rate Limiting

class Coordinator {
  constructor() {
    this.maxConcurrentWorkers = 20;
  }

  async spawnWorker(...args) {
    // Wait if at capacity
    while (this.getActiveWorkerCount() >= this.maxConcurrentWorkers) {
      await new Promise(resolve => setTimeout(resolve, 1000));
    }

    return this.workerManager.spawnWorker(...args);
  }
}

Observability

const { performance } = require('perf_hooks');

class Coordinator {
  async processTask(taskId) {
    const start = performance.now();

    try {
      // ... process task ...

      const duration = performance.now() - start;
      this.recordMetric('task_duration', duration);
    } catch (error) {
      this.recordMetric('task_failures', 1);
    }
  }
}

Key Takeaways

  1. Coordinator orchestrates - Routes, spawns, monitors
  2. Router decides - Pattern + confidence scoring
  3. Worker manager executes - Lifecycle management
  4. Learning happens - Every task improves routing
  5. JSONL for state - Simple, debuggable, reliable

You now have a working MoE coordinator! Tomorrow, we’ll explore how Cortex used itself to transform from commit-relay to Cortex - meta-programming in action.

Learn More About Cortex

Want to dive deeper into how Cortex works? Visit the Meet Cortex page to learn about its architecture, capabilities, and how it scales from 1 to 100+ agents on-demand.


Part 8 of the Cortex series. Next: Meta-Programming: Using Cortex to Build Cortex

#Tutorial #Implementation #Coordinator #Cortex #Node.js