diff --git a/docs/content/docs/node/guides/core/index.mdx b/docs/content/docs/node/guides/core/index.mdx
new file mode 100644
index 00000000..18933667
--- /dev/null
+++ b/docs/content/docs/node/guides/core/index.mdx
@@ -0,0 +1,18 @@
+---
+title: Core
+description: "The building blocks of every Node.js taskito application."
+---
+
+The building blocks of every taskito application.
+
+| Guide | Description |
+|---|---|
+| [Tasks](/node/guides/core/tasks) | Register tasks with `queue.task()`, configure retries, timeouts, and options |
+| [Queues & Priority](/node/guides/core/queues) | Named queues, priority levels, and routing |
+| [Workers](/node/guides/core/workers) | Start workers, control concurrency, graceful shutdown |
+| [Execution Models](/node/guides/core/execution-model) | How tasks move from enqueue to completion |
+| [Enqueue options](/node/guides/core/enqueue-options) | Per-job delay, priority, uniqueness, and metadata |
+| [Predicates](/node/guides/core/predicates) | Gate an enqueue on a runtime condition |
+| [Scheduling](/node/guides/core/scheduling) | Periodic tasks with cron expressions |
+| [Cancellation](/node/guides/core/cancellation) | Cancel queued and in-flight jobs |
+| [Streaming partial results](/node/guides/core/streaming) | Emit progress from a running task |
diff --git a/docs/content/docs/node/guides/extensibility/index.mdx b/docs/content/docs/node/guides/extensibility/index.mdx
new file mode 100644
index 00000000..d7975634
--- /dev/null
+++ b/docs/content/docs/node/guides/extensibility/index.mdx
@@ -0,0 +1,13 @@
+---
+title: Extensibility
+description: "Extend the Node.js SDK at every stage of the task lifecycle."
+---
+
+Extend taskito with custom behavior at every stage of the task lifecycle.
+
+| Guide | Description |
+|---|---|
+| [Events](/node/guides/extensibility/events) | Subscribe to queue and worker lifecycle events |
+| [Per-Task Middleware](/node/guides/extensibility/middleware) | Wrap task execution with before/after/error hooks |
+| [Webhooks](/node/guides/extensibility/webhooks) | Push signed notifications to external services |
+| [Pluggable Serializers](/node/guides/extensibility/serializers) | Custom payload serialization — msgpack, encryption |
diff --git a/docs/content/docs/node/guides/index.mdx b/docs/content/docs/node/guides/index.mdx
new file mode 100644
index 00000000..8fbe9f8b
--- /dev/null
+++ b/docs/content/docs/node/guides/index.mdx
@@ -0,0 +1,70 @@
+---
+title: Guides
+description: Topical guides covering the Node.js SDK's surface area.
+---
+
+import { Cards, Card } from "fumadocs-ui/components/card";
+import {
+ Boxes,
+ ShieldCheck,
+ Wrench,
+ Activity,
+ Layers,
+ Workflow,
+ Plug,
+ Puzzle,
+} from "lucide-react";
+
+Pick the topic you're working through. Each guide is self-contained, with code
+samples, gotchas, and links to the relevant API reference.
+
+
+ }
+ title="Core"
+ href="/node/guides/core"
+ description="Define tasks, route to queues, run workers, schedule periodics."
+ />
+ }
+ title="Reliability"
+ href="/node/guides/reliability"
+ description="Retries, dead letters, circuit breakers, idempotency, error handling."
+ />
+ }
+ title="Workflows"
+ href="/node/guides/workflows"
+ description="DAGs, fan-out, fan-in, conditions, gates, sub-workflows, saga compensation."
+ />
+ }
+ title="Extensibility"
+ href="/node/guides/extensibility"
+ description="Custom serializers, middleware, events, webhooks."
+ />
+ }
+ title="Resources"
+ href="/node/guides/resources"
+ description="Inject DB clients, HTTP sessions, and cloud SDKs by name; intercept enqueues."
+ />
+ }
+ title="Integrations"
+ href="/node/guides/integrations"
+ description="Observability and web-framework helpers for the Node.js SDK."
+ />
+ }
+ title="Operations"
+ href="/node/guides/operations"
+ description="Deployment, scaling, mesh, KEDA, migration from BullMQ."
+ />
+ }
+ title="Observability"
+ href="/node/guides/observability"
+ description="Monitor queue health, the dashboard, and structured logs."
+ />
+
diff --git a/docs/content/docs/node/guides/meta.json b/docs/content/docs/node/guides/meta.json
index d884b213..2a3cf46a 100644
--- a/docs/content/docs/node/guides/meta.json
+++ b/docs/content/docs/node/guides/meta.json
@@ -1 +1,16 @@
-{ "title": "Guides", "root": true, "pages": ["core", "reliability", "workflows", "extensibility", "resources", "integrations", "operations", "observability"] }
+{
+ "title": "Guides",
+ "root": true,
+ "pages": [
+ "index",
+ "core",
+ "reliability",
+ "workflows",
+ "extensibility",
+ "resources",
+ "integrations",
+ "operations",
+ "observability"
+ ]
+}
+
diff --git a/docs/content/docs/node/guides/operations/index.mdx b/docs/content/docs/node/guides/operations/index.mdx
new file mode 100644
index 00000000..8f666d89
--- /dev/null
+++ b/docs/content/docs/node/guides/operations/index.mdx
@@ -0,0 +1,21 @@
+---
+title: Operations
+description: "Run the Node.js SDK reliably in production — deployment, scaling, migration."
+---
+
+Run taskito reliably in production.
+
+| Guide | Description |
+|---|---|
+| [Backends](/node/guides/operations/backends) | SQLite, Postgres, and Redis — setup and trade-offs |
+| [Job Management](/node/guides/operations/inspection) | Inspect, cancel, replay, and clean up jobs |
+| [Dashboard](/node/guides/operations/dashboard) | Serve the web dashboard from a Node worker |
+| [Dashboard REST API](/node/guides/operations/dashboard-api) | The JSON API behind the dashboard |
+| [Mesh Scheduling](/node/guides/operations/mesh) | Decentralized work-stealing across worker nodes |
+| [KEDA Autoscaling](/node/guides/operations/keda) | Kubernetes event-driven autoscaling from queue depth |
+| [CLI](/node/guides/operations/cli) | Manage queues and workers from the command line |
+| [Testing](/node/guides/operations/testing) | Test mode, fixtures, and mocking |
+| [Security](/node/guides/operations/security) | Signing, encryption, and dashboard auth |
+| [Troubleshooting](/node/guides/operations/troubleshooting) | Diagnose stuck jobs and worker issues |
+| [Deployment](/node/guides/operations/deployment) | Docker, process managers, and production checklists |
+| [Migrating from BullMQ](/node/guides/operations/migration) | Side-by-side comparison and migration guide |
diff --git a/docs/content/docs/node/guides/reliability/index.mdx b/docs/content/docs/node/guides/reliability/index.mdx
new file mode 100644
index 00000000..d0d009ef
--- /dev/null
+++ b/docs/content/docs/node/guides/reliability/index.mdx
@@ -0,0 +1,19 @@
+---
+title: Reliability
+description: "Harden your Node.js task queue for production workloads."
+---
+
+Harden your task queue for production workloads.
+
+| Guide | Description |
+|---|---|
+| [Error Handling](/node/guides/reliability/error-handling) | Task failure lifecycle, error inspection, debugging patterns |
+| [Delivery Guarantees](/node/guides/reliability/guarantees) | At-least-once delivery and exactly-once patterns |
+| [Retries](/node/guides/reliability/retries) | Automatic retries with exponential backoff |
+| [Timeouts](/node/guides/reliability/timeouts) | Bound task runtime and reclaim stalled jobs |
+| [Idempotency](/node/guides/reliability/idempotency) | Deduplicate enqueues with unique keys |
+| [Rate Limiting](/node/guides/reliability/rate-limiting) | Throttle task execution with token-bucket limits |
+| [Concurrency](/node/guides/reliability/concurrency) | Cap how many jobs of a task run at once |
+| [Circuit Breakers](/node/guides/reliability/circuit-breakers) | Protect downstream services from cascading failures |
+| [Dead-letter queue](/node/guides/reliability/dead-letter) | Capture and replay exhausted jobs |
+| [Distributed Locking](/node/guides/reliability/locks) | Mutual exclusion across workers with database-backed locks |
diff --git a/sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.java b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.java
new file mode 100644
index 00000000..7ca91c34
--- /dev/null
+++ b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.java
@@ -0,0 +1,33 @@
+package org.byteveda.taskito.autoscale;
+
+import java.time.Duration;
+
+/**
+ * Tuning for the {@link Autoscaler}.
+ *
+ * @param minWorkers the floor on handler threads (must be ≥ 1)
+ * @param maxWorkers the ceiling on handler threads (must be ≥ {@code minWorkers})
+ * @param tasksPerWorker desired outstanding tasks per worker thread (must be ≥ 1)
+ * @param interval how often to re-evaluate (must be positive)
+ */
+public record AutoscaleOptions(int minWorkers, int maxWorkers, int tasksPerWorker, Duration interval) {
+ public AutoscaleOptions {
+ if (minWorkers < 1) {
+ throw new IllegalArgumentException("minWorkers must be >= 1");
+ }
+ if (maxWorkers < minWorkers) {
+ throw new IllegalArgumentException("maxWorkers must be >= minWorkers");
+ }
+ if (tasksPerWorker < 1) {
+ throw new IllegalArgumentException("tasksPerWorker must be >= 1");
+ }
+ if (interval == null || interval.isNegative() || interval.isZero()) {
+ throw new IllegalArgumentException("interval must be positive");
+ }
+ }
+
+ /** Defaults: scale {@code min..max} threads, ~10 tasks per worker, re-evaluated every 2s. */
+ public static AutoscaleOptions of(int minWorkers, int maxWorkers) {
+ return new AutoscaleOptions(minWorkers, maxWorkers, 10, Duration.ofSeconds(2));
+ }
+}
diff --git a/sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java
new file mode 100644
index 00000000..a77f878b
--- /dev/null
+++ b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java
@@ -0,0 +1,83 @@
+package org.byteveda.taskito.autoscale;
+
+import java.lang.System.Logger;
+import java.lang.System.Logger.Level;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
+
+/**
+ * Periodically resizes a worker's handler pool to {@code ceil(depth /
+ * tasksPerWorker)} clamped to {@code [minWorkers, maxWorkers]}, where depth is
+ * the queue's outstanding work. Growing raises the pool ceiling before its core;
+ * shrinking lowers the core and lets idle threads time out — running handlers are
+ * never interrupted.
+ */
+public final class Autoscaler implements AutoCloseable {
+ private static final Logger LOG = System.getLogger(Autoscaler.class.getName());
+
+ private final ThreadPoolExecutor pool;
+ private final LongSupplier depth;
+ private final AutoscaleOptions options;
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Autoscaler::daemon);
+
+ public Autoscaler(ThreadPoolExecutor pool, LongSupplier depth, AutoscaleOptions options) {
+ this.pool = pool;
+ this.depth = depth;
+ this.options = options;
+ // Let core threads retire so the pool actually shrinks when idle.
+ pool.allowCoreThreadTimeOut(true);
+ }
+
+ /** Begin periodic resizing. */
+ public void start() {
+ long ms = options.interval().toMillis();
+ scheduler.scheduleAtFixedRate(this::tickSafely, ms, ms, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() {
+ scheduler.shutdownNow();
+ }
+
+ /** The target pool size for {@code depth} under {@code options}. */
+ public static int desiredSize(long depth, AutoscaleOptions options) {
+ long perWorker = options.tasksPerWorker();
+ long target = (depth + perWorker - 1) / perWorker; // ceil(depth / perWorker)
+ return (int) Math.max(options.minWorkers(), Math.min(options.maxWorkers(), target));
+ }
+
+ private void tickSafely() {
+ try {
+ tick();
+ } catch (Throwable e) {
+ // scheduleAtFixedRate cancels all future runs if a task escapes with
+ // any Throwable — catch everything and log so the loop survives a bad tick.
+ LOG.log(Level.WARNING, "autoscaler tick failed; retrying next interval", e);
+ }
+ }
+
+ /** Read depth once and resize the pool. Package-visible for tests. */
+ void tick() {
+ resize(desiredSize(depth.getAsLong(), options));
+ }
+
+ private void resize(int target) {
+ // setCorePoolSize fails if it exceeds the max, so order the two writes.
+ if (target > pool.getMaximumPoolSize()) {
+ pool.setMaximumPoolSize(target);
+ pool.setCorePoolSize(target);
+ } else {
+ pool.setCorePoolSize(target);
+ pool.setMaximumPoolSize(target);
+ }
+ }
+
+ private static Thread daemon(Runnable runnable) {
+ Thread thread = new Thread(runnable, "taskito-autoscaler");
+ thread.setDaemon(true);
+ return thread;
+ }
+}
diff --git a/sdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.java b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.java
new file mode 100644
index 00000000..c422551e
--- /dev/null
+++ b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.java
@@ -0,0 +1,8 @@
+/**
+ * In-process autoscaling of a worker's handler thread pool. An
+ * {@link org.byteveda.taskito.autoscale.Autoscaler} periodically reads queue
+ * depth and resizes a {@link java.util.concurrent.ThreadPoolExecutor} between a
+ * min and max, so a worker grows under load and shrinks when idle. Enable it via
+ * {@code Worker.Builder.autoscale(...)}.
+ */
+package org.byteveda.taskito.autoscale;
diff --git a/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java b/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java
index 57255504..2e1f966a 100644
--- a/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java
+++ b/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java
@@ -1,5 +1,7 @@
package org.byteveda.taskito.worker;
+import com.fasterxml.jackson.core.JacksonException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Arrays;
@@ -11,8 +13,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import org.byteveda.taskito.autoscale.AutoscaleOptions;
+import org.byteveda.taskito.autoscale.Autoscaler;
import org.byteveda.taskito.errors.SerializationException;
import org.byteveda.taskito.errors.WorkflowException;
import org.byteveda.taskito.events.Emitter;
@@ -41,15 +47,21 @@ public final class Worker implements AutoCloseable {
private final ExecutorService executor;
private final WorkflowTracker tracker;
private final ResourceRuntime resources;
+ private final Autoscaler autoscaler;
private final CountDownLatch shutdown = new CountDownLatch(1);
private boolean closed;
private Worker(
- WorkerControl control, ExecutorService executor, WorkflowTracker tracker, ResourceRuntime resources) {
+ WorkerControl control,
+ ExecutorService executor,
+ WorkflowTracker tracker,
+ ResourceRuntime resources,
+ Autoscaler autoscaler) {
this.control = control;
this.executor = executor;
this.tracker = tracker;
this.resources = resources;
+ this.autoscaler = autoscaler;
}
public static Builder builder(QueueBackend backend, Serializer serializer, List middleware) {
@@ -104,6 +116,9 @@ public synchronized void close() {
return;
}
closed = true;
+ if (autoscaler != null) {
+ autoscaler.close(); // stop resizing before we tear the pool down
+ }
control.stop(); // stop scheduling new work
executor.shutdown(); // stop accepting; let running handlers finish
try {
@@ -138,6 +153,7 @@ public static final class Builder {
private Integer channelCapacity;
private Integer batchSize;
private WorkflowTracker tracker;
+ private AutoscaleOptions autoscale;
Builder(QueueBackend backend, Serializer serializer, List middleware, ResourceRuntime resources) {
this.backend = backend;
@@ -206,6 +222,16 @@ public Builder batchSize(int batchSize) {
return this;
}
+ /**
+ * Autoscale the handler pool between {@code min} and {@code max} threads
+ * based on queue depth. Replaces the fixed/cached pool with a resizable
+ * one driven by an {@link Autoscaler}.
+ */
+ public Builder autoscale(AutoscaleOptions options) {
+ this.autoscale = options;
+ return this;
+ }
+
public Builder on(EventName name, Consumer listener) {
listeners.computeIfAbsent(name, key -> new ArrayList<>()).add(listener);
return this;
@@ -240,8 +266,21 @@ private WorkflowTracker ensureTracker() {
}
public Worker start() {
- ExecutorService executor =
- concurrency > 0 ? Executors.newFixedThreadPool(concurrency) : Executors.newCachedThreadPool();
+ Autoscaler scaler = null;
+ ExecutorService executor;
+ if (autoscale != null) {
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(
+ autoscale.minWorkers(),
+ autoscale.maxWorkers(),
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>());
+ scaler = new Autoscaler(pool, this::currentDepth, autoscale);
+ executor = pool;
+ } else {
+ executor =
+ concurrency > 0 ? Executors.newFixedThreadPool(concurrency) : Executors.newCachedThreadPool();
+ }
Emitter emitter = new Emitter();
listeners.forEach((name, bound) -> bound.forEach(listener -> emitter.on(name, listener)));
WorkerDispatchBridge bridge =
@@ -250,7 +289,55 @@ public Worker start() {
bridge.bind(control);
// Lease worker resources only after the native worker started cleanly.
resources.acquireWorker();
- return new Worker(control, executor, tracker, resources);
+ if (scaler != null) {
+ scaler.start();
+ }
+ return new Worker(control, executor, tracker, resources, scaler);
+ }
+
+ /**
+ * Outstanding work (pending + running) this worker can actually consume.
+ * Scoped to the configured {@code queues} so backlog on queues this worker
+ * doesn't serve can't drive it to {@code maxWorkers}; unscoped otherwise.
+ */
+ private long currentDepth() {
+ if (queues == null || queues.isEmpty()) {
+ return depthFrom(backend.statsJson());
+ }
+ long total = 0;
+ for (String queue : queues) {
+ total += depthFrom(backend.statsByQueueJson(queue));
+ }
+ return total;
+ }
+
+ /**
+ * Depth from one stats blob. Propagates on failure rather than reporting
+ * {@code 0}: the autoscaler swallows a tick error and retries, whereas a
+ * false-empty reading would shrink the pool while backlog still exists.
+ */
+ private static long depthFrom(String statsJson) {
+ JsonNode stats;
+ try {
+ stats = JSON.readTree(statsJson);
+ } catch (JacksonException e) {
+ throw new IllegalStateException("failed to parse worker queue stats", e);
+ }
+ return numericField(stats, "pending") + numericField(stats, "running");
+ }
+
+ /**
+ * Read an integral stats field, rejecting a missing or non-numeric value
+ * instead of coercing it to {@code 0} (which would read as an empty queue
+ * and shrink the pool). The autoscaler swallows the resulting error and
+ * retries on the next tick.
+ */
+ private static long numericField(JsonNode stats, String name) {
+ JsonNode node = stats.get(name);
+ if (node == null || !node.canConvertToLong()) {
+ throw new IllegalStateException("worker queue stats field '" + name + "' is missing or non-numeric");
+ }
+ return node.asLong();
}
private String encodeOptions() {
diff --git a/sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java b/sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java
new file mode 100644
index 00000000..45586a94
--- /dev/null
+++ b/sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java
@@ -0,0 +1,50 @@
+package org.byteveda.taskito.autoscale;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.time.Duration;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.jupiter.api.Test;
+
+class AutoscalerTest {
+
+ private static final AutoscaleOptions OPTS = new AutoscaleOptions(1, 5, 10, Duration.ofSeconds(60));
+
+ @Test
+ void desiredSizeClampsToBounds() {
+ assertEquals(1, Autoscaler.desiredSize(0, OPTS)); // empty → floor
+ assertEquals(1, Autoscaler.desiredSize(5, OPTS)); // ceil(0.5) → floor
+ assertEquals(3, Autoscaler.desiredSize(25, OPTS)); // ceil(2.5)
+ assertEquals(5, Autoscaler.desiredSize(1000, OPTS)); // clamp to ceiling
+ }
+
+ @Test
+ void tickResizesPoolToDepth() {
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+ AtomicLong depth = new AtomicLong(0);
+ try (Autoscaler scaler = new Autoscaler(pool, depth::get, OPTS)) {
+ depth.set(30);
+ scaler.tick();
+ assertEquals(3, pool.getCorePoolSize());
+ assertEquals(3, pool.getMaximumPoolSize());
+
+ depth.set(0);
+ scaler.tick();
+ assertEquals(1, pool.getCorePoolSize());
+ } finally {
+ pool.shutdown();
+ }
+ }
+
+ @Test
+ void rejectsInvalidOptions() {
+ assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(0, 5, 10, Duration.ofSeconds(1)));
+ assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(3, 1, 10, Duration.ofSeconds(1)));
+ assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(1, 5, 0, Duration.ofSeconds(1)));
+ assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(1, 5, 10, Duration.ZERO));
+ }
+}