mcp-job-queue

tommypj/mcp-job-queue
0 starsMITCommunity

Install to Claude Code

This server doesn't publish a one-line install command. Follow the setup in the source repository.

Summary

Enables MCP clients to submit long-running jobs that are executed safely in isolated child processes with a durable SQLite queue, configurable timeouts, retries with backoff, and backpressure.

README.md

mcp-job-queue

A production MCP server + worker daemon for long-running jobs: durable SQLite queue, isolated child-process execution, per-job timeouts, retries with backoff, and backpressure.

LLM agents are great at _deciding_ to do work and terrible at _holding_ it. The moment a tool call kicks off something slow — render a video, transcode audio, crawl a site, call a flaky API — an in-process tool blocks the conversation, and if the process dies the work vanishes with no record. mcp-job-queue is the durable backbone for that: agents submit_job and get an id back instantly; a separate worker daemon runs each job in its own OS process under a hard timeout, retries transient failures, bounds concurrency, and survives restarts. It's the difference between "the agent ran a script" and "the agent dispatched a job to a system that will actually finish it."

!Node !TypeScript !License !MCP

---

Features

  • Durable queue — jobs live in SQLite (WAL via the built-in node:sqlite, zero native deps). Submit, crash, restart — nothing is lost.
  • Decoupled server & worker — the MCP server only enqueues/reads; the worker only claims/runs. Either restarts independently; they meet only at the database file.
  • Isolated execution — every job runs in its own forked child process, so a handler that hangs, leaks, or segfaults can't take down the worker.
  • Hard timeouts — a per-job wall-clock timeout that ends in a real SIGKILL, not a hopeful AbortController the job can ignore.
  • Retries with backoff — failed jobs are re-queued with exponential backoff until maxAttempts is reached, then fail terminally.
  • Backpressure — a maxConcurrency cap means a flood of submissions can never exhaust CPU/memory; excess work waits in the queue.
  • Crash recovery — on startup the worker re-queues jobs orphaned mid-run by a previous crash (or fails them if out of attempts).
  • Allowlisted handlers — clients can only submit a registered job type; there is no arbitrary command execution. This is the worker's security boundary.
  • Typed errors & JSON logs — tools return structured {code, message, retryable} instead of throwing; the worker emits one structured log line per job.

---

Architecture

        submit_job / get_job / list_jobs / cancel_job / get_stats
 ┌────────────┐                          ┌──────────────────────┐
 │ MCP client │ ──stdio──▶ ┌──────────┐  │   worker daemon       │
 │ (Claude…)  │            │ MCP       │  │  ┌────────────────┐   │
 └────────────┘            │ server    │  │  │ poll + claim   │   │
                           │ (enqueue/ │  │  │  (BEGIN        │   │
                           │  read)    │  │  │   IMMEDIATE)   │   │
                           └────┬──────┘  │  └───────┬────────┘   │
                                │         │          │ fork       │
                                ▼         │          ▼            │
                       ┌───────────────────────┐  ┌────────────┐  │
                       │   SQLite (WAL) queue   │  │ child proc │  │
                       │  jobs: state machine   │◀─│  handler   │  │
                       └───────────────────────┘  │ (timeout/  │  │
                                ▲                  │  SIGKILL)  │  │
                                │   artifact +     └─────┬──────┘  │
                                │   result/status        │        │
                                └────────────────────────┘  ▼      │
                                                      artifacts/   │
                                                      <id>.json    │
                                                      └────────────┘

The queue is the only shared state. The server process and the worker process never talk directly — they coordinate entirely through atomic SQLite transactions.

---

Production handling, not a demo

Five patterns pulled straight from the source.

1. Atomic claim — a job goes to exactly one worker

The claim is wrapped in BEGIN IMMEDIATE, which takes SQLite's write lock up front. Even with several workers polling the same database, no two can grab the same job.

// db.ts
this.db.exec("BEGIN IMMEDIATE");
try {
  const candidate = this.db
    .prepare(
      `SELECT id FROM jobs WHERE status = 'queued' AND next_run_at <= ?
       ORDER BY priority DESC, created_at ASC LIMIT 1`,
    )
    .get(now);
  if (!candidate) {
    this.db.exec("COMMIT");
    return undefined;
  }
  this.db
    .prepare(`UPDATE jobs SET status = 'running', attempts = attempts + 1, ... WHERE id = ?`)
    .run(/* ... */ candidate.id);
  this.db.exec("COMMIT");
  return this.get(candidate.id);
} catch (err) {
  this.db.exec("ROLLBACK");
  throw err;
}

_Why it matters:_ this is what makes the queue safe to scale horizontally and safe against double-execution — the hardest correctness property a job queue has to get right.

2. Isolation with a real timeout kill

Each job is a separate process; the timeout ends in SIGKILL, so even a tight CPU loop that ignores cooperative cancellation is stopped.

// runner.ts
const child = spawn(command, [...baseArgs, childScript], { stdio: ["pipe", "pipe", "pipe"] });
const onAbort = () => {
  child.kill("SIGKILL");
  finish({ ok: false, error: "job exceeded timeout and was killed", timedOut: true });
};
signal.addEventListener("abort", onAbort, { once: true });
child.stdin.write(JSON.stringify({ type: job.type, payload: safeParse(job.payload) }));

_Why it matters:_ a worker that can't _guarantee_ it reclaims resources from a stuck job will slowly grind to a halt. Process isolation + SIGKILL is the only reliable answer.

3. Retry with exponential backoff, or terminal failure

On failure the queue decides — re-queue with growing backoff while attempts remain, otherwise fail terminally. One method, one source of truth.

// db.ts
if (job.attempts < job.max_attempts) {
  const backoff = this.backoffMs(job.attempts); // base * 2^(attempts-1) + jitter
  this.db
    .prepare(`UPDATE jobs SET status = 'queued', error = ?, next_run_at = ? ... WHERE id = ?`)
    .run(errorMessage, now + backoff, /* ... */ id);
  return { job: this.get(id)!, retried: true };
}
this.db
  .prepare(`UPDATE jobs SET status = 'failed', error = ?, finished_at = ? ... WHERE id = ?`)
  .run(errorMessage, now, /* ... */ id);
return { job: this.get(id)!, retried: false };

4. Crash recovery on startup

A worker that dies mid-job leaves rows stuck in running. On boot we reclaim them — re-queue if attempts remain, fail otherwise — so a crash never silently strands work.

// db.ts — called once when the worker starts
recoverOrphaned(): number {
  const orphans = this.db.prepare("SELECT * FROM jobs WHERE status = 'running'").all();
  for (const job of orphans) {
    if (job.attempts < job.max_attempts) /* re-queue */;
    else /* terminal fail: "orphaned after worker crash" */;
  }
  return orphans.length;
}

_Why it matters:_ most "simple" queues skip this and quietly lose in-flight jobs on every deploy or crash. Recovery is what makes "durable" actually true.

5. Allowlisted handlers — no arbitrary execution

A client can only submit a type that exists in the handler registry. There is no path from an MCP message to an arbitrary shell command.

// handlers.ts
export const HANDLERS: Record<string, JobHandler> = {
  echo: async (payload) => ({ echoed: payload }),
  wait: async (payload) => {
    /* sleep — exercise timeouts */
  },
  hash: async (payload) => ({ digest: sha256(payload.text) }),
  fibonacci: async (payload) => ({ value: fib(payload.n).toString() }),
  fail: async (payload) => {
    throw new Error(/* exercise retries */);
  },
};

_Why it matters:_ "let the agent run a job" must never mean "let the agent run anything." Real work (render/transcode/scrape) is added as a new handler here — the queue machinery around it never changes.

---

Quickstart

Requires Node ≥ 22 (for the built-in node:sqlite).

git clone https://github.com/tommypj/mcp-job-queue.git
cd mcp-job-queue
npm install
npm run build

The system is two processes that share a queue file. Start the worker:

npm run worker        # node --experimental-sqlite dist/worker.js

Then run the MCP server (normally launched by your MCP client, see below):

npm run server        # node --experimental-sqlite dist/server.js  (stdio)

The --experimental-sqlite flag is required on Node 22 and accepted (harmless) on Node 24+.

For local hacking without a build, use the dev scripts: npm run dev:worker and npm run dev:server (run TypeScript directly via tsx).

---

Use it in Claude Desktop / Claude Code

Add this to claude_desktop_config.json (mirrors examples/claude_desktop_config.json) and run the worker separately:

{
  "mcpServers": {
    "job-queue": {
      "command": "node",
      "args": ["--experimental-sqlite", "/absolute/path/to/mcp-job-queue/dist/server.js"],
      "env": {
        "JOBQ_DB_PATH": "/absolute/path/to/queue.db",
        "JOBQ_ARTIFACT_DIR": "/absolute/path/to/artifacts"
      }
    }
  }
}

Config file locations:

  • macOS: ~/Library/Application Support/Claude/claude_desktop_config.json
  • Windows: %APPDATA%\Claude\claude_desktop_config.json
  • Claude Code: claude mcp add job-queue -- node --experimental-sqlite /absolute/path/to/mcp-job-queue/dist/server.js

The server and the worker must point at the same JOBQ_DB_PATH.

---

Tools reference

submit_job(type, payload?, priority?, maxAttempts?, timeoutMs?)

Enqueue a job; returns it in status queued. type must be a registered handler (echo, wait, hash, fibonacci, fail). timeoutMs/maxAttempts are clamped to safe ceilings.

  • Errors: UNKNOWN_JOB_TYPE.
// submit_job { "type": "hash", "payload": { "text": "hello world" } }
{
  "id": "081beee6-…",
  "type": "hash",
  "status": "queued",
  "attempts": 0,
  "maxAttempts": 3,
  "timeoutMs": 30000,
  "payload": { "text": "hello world" },
}

get_job(id)

Fetch one job: status, result, error, artifact path, timestamps. Errors: JOB_NOT_FOUND.

list_jobs(status?, limit?)

Recent jobs (newest first), optionally filtered by status (queued|running|succeeded|failed|cancelled).

cancel_job(id)

Cancel a still-queued job. Errors: JOB_NOT_FOUND, NOT_CANCELLABLE (running/finished jobs can't be cancelled).

get_stats()

Queue health: counts by status, total, age of the oldest queued job, and active config.

{
  "countsByStatus": { "queued": 0, "running": 1, "succeeded": 12, "failed": 1, "cancelled": 0 },
  "total": 14,
  "oldestQueuedAgeMs": null,
  "config": {
    "maxConcurrency": 4,
    "defaultTimeoutMs": 30000,
    "registeredHandlers": ["echo", "wait", "hash", "fibonacci", "fail"],
  },
}

---

Configuration

Environment variables, all prefixed JOBQ_ (see .env.example). The server and worker must share JOBQ_DB_PATH.

| Variable | Default | Description | | --------------------------- | ---------------------------- | --------------------------------------------- | | JOBQ_DB_PATH | ~/.mcp-job-queue/queue.db | SQLite (WAL) queue file | | JOBQ_ARTIFACT_DIR | ~/.mcp-job-queue/artifacts | Where result artifacts are written | | JOBQ_MAX_CONCURRENCY | 4 | Max jobs a worker runs at once (backpressure) | | JOBQ_POLL_INTERVAL_MS | 250 | Idle poll interval | | JOBQ_DEFAULT_TIMEOUT_MS | 30000 | Default per-job timeout | | JOBQ_MAX_TIMEOUT_MS | 600000 | Hard ceiling for a per-job timeout | | JOBQ_DEFAULT_MAX_ATTEMPTS | 3 | Default attempts incl. the first | | JOBQ_MAX_ATTEMPTS_CEILING | 10 | Hard ceiling for attempts | | JOBQ_RETRY_BASE_DELAY_MS | 500 | Base delay for exponential backoff | | JOBQ_LOG_LEVEL | info | debug / info / warn / error |

---

Testing

npm test            # 30 tests (vitest), incl. a real forked-child integration test
npm run lint        # eslint + prettier --check

Coverage targets the production paths: atomic claim + priority ordering, retry-vs-terminal transitions, orphan recovery, cancel rules, queue stats, the worker pool (success, timeout-kill, retry, and a strict concurrency-cap assertion), every handler, the real fork runner (spawns a child, captures failure, SIGKILLs on overrun), and the full MCP tool surface through an in-memory client.

---

Design decisions

  • node:sqlite, not better-sqlite3 — the built-in module means zero native compilation (no node-gyp), which makes the repo trivial to clone and run. WAL mode gives concurrent readers while the worker writes.
  • Two processes, not one — decoupling the MCP server from the worker is the core design choice: it lets the agent-facing surface and the compute surface scale, deploy, and crash independently. The queue file is the contract.
  • Child process per job, not a worker thread — a separate OS process is the only isolation strong enough to survive native crashes and guarantee a timeout via SIGKILL. Worker threads share a heap and can't be force-killed cleanly.
  • Handlers are an allowlist — no arbitrary command execution by design; this is the security boundary and the extension point in one. Plugging in real work is a one-function change.
  • Errors as values — tools return typed {code, message, retryable} so an agent can branch programmatically, and the server never crashes on bad input.
  • Cancelling running jobs is intentionally out of scope (v1) — it would require the worker to poll a cancel flag and coordinate the kill; queued-cancel covers the common case cleanly. Documented rather than half-built.
  • Deterministic jitter — retry backoff uses a small deterministic jitter so tests are reproducible while retries still spread; true randomness isn't needed for correctness here.

---

License

MIT © Dan Tomescu. See LICENSE.

Related MCP servers

Browse all →