diff --git a/docs/content/docs/node/guides/core/index.mdx b/docs/content/docs/node/guides/core/index.mdx new file mode 100644 index 00000000..18933667 --- /dev/null +++ b/docs/content/docs/node/guides/core/index.mdx @@ -0,0 +1,18 @@ +--- +title: Core +description: "The building blocks of every Node.js taskito application." +--- + +The building blocks of every taskito application. + +| Guide | Description | +|---|---| +| [Tasks](/node/guides/core/tasks) | Register tasks with `queue.task()`, configure retries, timeouts, and options | +| [Queues & Priority](/node/guides/core/queues) | Named queues, priority levels, and routing | +| [Workers](/node/guides/core/workers) | Start workers, control concurrency, graceful shutdown | +| [Execution Models](/node/guides/core/execution-model) | How tasks move from enqueue to completion | +| [Enqueue options](/node/guides/core/enqueue-options) | Per-job delay, priority, uniqueness, and metadata | +| [Predicates](/node/guides/core/predicates) | Gate an enqueue on a runtime condition | +| [Scheduling](/node/guides/core/scheduling) | Periodic tasks with cron expressions | +| [Cancellation](/node/guides/core/cancellation) | Cancel queued and in-flight jobs | +| [Streaming partial results](/node/guides/core/streaming) | Emit progress from a running task | diff --git a/docs/content/docs/node/guides/extensibility/index.mdx b/docs/content/docs/node/guides/extensibility/index.mdx new file mode 100644 index 00000000..d7975634 --- /dev/null +++ b/docs/content/docs/node/guides/extensibility/index.mdx @@ -0,0 +1,13 @@ +--- +title: Extensibility +description: "Extend the Node.js SDK at every stage of the task lifecycle." +--- + +Extend taskito with custom behavior at every stage of the task lifecycle. + +| Guide | Description | +|---|---| +| [Events](/node/guides/extensibility/events) | Subscribe to queue and worker lifecycle events | +| [Per-Task Middleware](/node/guides/extensibility/middleware) | Wrap task execution with before/after/error hooks | +| [Webhooks](/node/guides/extensibility/webhooks) | Push signed notifications to external services | +| [Pluggable Serializers](/node/guides/extensibility/serializers) | Custom payload serialization — msgpack, encryption | diff --git a/docs/content/docs/node/guides/index.mdx b/docs/content/docs/node/guides/index.mdx new file mode 100644 index 00000000..8fbe9f8b --- /dev/null +++ b/docs/content/docs/node/guides/index.mdx @@ -0,0 +1,70 @@ +--- +title: Guides +description: Topical guides covering the Node.js SDK's surface area. +--- + +import { Cards, Card } from "fumadocs-ui/components/card"; +import { + Boxes, + ShieldCheck, + Wrench, + Activity, + Layers, + Workflow, + Plug, + Puzzle, +} from "lucide-react"; + +Pick the topic you're working through. Each guide is self-contained, with code +samples, gotchas, and links to the relevant API reference. + + + } + title="Core" + href="/node/guides/core" + description="Define tasks, route to queues, run workers, schedule periodics." + /> + } + title="Reliability" + href="/node/guides/reliability" + description="Retries, dead letters, circuit breakers, idempotency, error handling." + /> + } + title="Workflows" + href="/node/guides/workflows" + description="DAGs, fan-out, fan-in, conditions, gates, sub-workflows, saga compensation." + /> + } + title="Extensibility" + href="/node/guides/extensibility" + description="Custom serializers, middleware, events, webhooks." + /> + } + title="Resources" + href="/node/guides/resources" + description="Inject DB clients, HTTP sessions, and cloud SDKs by name; intercept enqueues." + /> + } + title="Integrations" + href="/node/guides/integrations" + description="Observability and web-framework helpers for the Node.js SDK." + /> + } + title="Operations" + href="/node/guides/operations" + description="Deployment, scaling, mesh, KEDA, migration from BullMQ." + /> + } + title="Observability" + href="/node/guides/observability" + description="Monitor queue health, the dashboard, and structured logs." + /> + diff --git a/docs/content/docs/node/guides/meta.json b/docs/content/docs/node/guides/meta.json index d884b213..2a3cf46a 100644 --- a/docs/content/docs/node/guides/meta.json +++ b/docs/content/docs/node/guides/meta.json @@ -1 +1,16 @@ -{ "title": "Guides", "root": true, "pages": ["core", "reliability", "workflows", "extensibility", "resources", "integrations", "operations", "observability"] } +{ + "title": "Guides", + "root": true, + "pages": [ + "index", + "core", + "reliability", + "workflows", + "extensibility", + "resources", + "integrations", + "operations", + "observability" + ] +} + diff --git a/docs/content/docs/node/guides/operations/index.mdx b/docs/content/docs/node/guides/operations/index.mdx new file mode 100644 index 00000000..8f666d89 --- /dev/null +++ b/docs/content/docs/node/guides/operations/index.mdx @@ -0,0 +1,21 @@ +--- +title: Operations +description: "Run the Node.js SDK reliably in production — deployment, scaling, migration." +--- + +Run taskito reliably in production. + +| Guide | Description | +|---|---| +| [Backends](/node/guides/operations/backends) | SQLite, Postgres, and Redis — setup and trade-offs | +| [Job Management](/node/guides/operations/inspection) | Inspect, cancel, replay, and clean up jobs | +| [Dashboard](/node/guides/operations/dashboard) | Serve the web dashboard from a Node worker | +| [Dashboard REST API](/node/guides/operations/dashboard-api) | The JSON API behind the dashboard | +| [Mesh Scheduling](/node/guides/operations/mesh) | Decentralized work-stealing across worker nodes | +| [KEDA Autoscaling](/node/guides/operations/keda) | Kubernetes event-driven autoscaling from queue depth | +| [CLI](/node/guides/operations/cli) | Manage queues and workers from the command line | +| [Testing](/node/guides/operations/testing) | Test mode, fixtures, and mocking | +| [Security](/node/guides/operations/security) | Signing, encryption, and dashboard auth | +| [Troubleshooting](/node/guides/operations/troubleshooting) | Diagnose stuck jobs and worker issues | +| [Deployment](/node/guides/operations/deployment) | Docker, process managers, and production checklists | +| [Migrating from BullMQ](/node/guides/operations/migration) | Side-by-side comparison and migration guide | diff --git a/docs/content/docs/node/guides/reliability/index.mdx b/docs/content/docs/node/guides/reliability/index.mdx new file mode 100644 index 00000000..d0d009ef --- /dev/null +++ b/docs/content/docs/node/guides/reliability/index.mdx @@ -0,0 +1,19 @@ +--- +title: Reliability +description: "Harden your Node.js task queue for production workloads." +--- + +Harden your task queue for production workloads. + +| Guide | Description | +|---|---| +| [Error Handling](/node/guides/reliability/error-handling) | Task failure lifecycle, error inspection, debugging patterns | +| [Delivery Guarantees](/node/guides/reliability/guarantees) | At-least-once delivery and exactly-once patterns | +| [Retries](/node/guides/reliability/retries) | Automatic retries with exponential backoff | +| [Timeouts](/node/guides/reliability/timeouts) | Bound task runtime and reclaim stalled jobs | +| [Idempotency](/node/guides/reliability/idempotency) | Deduplicate enqueues with unique keys | +| [Rate Limiting](/node/guides/reliability/rate-limiting) | Throttle task execution with token-bucket limits | +| [Concurrency](/node/guides/reliability/concurrency) | Cap how many jobs of a task run at once | +| [Circuit Breakers](/node/guides/reliability/circuit-breakers) | Protect downstream services from cascading failures | +| [Dead-letter queue](/node/guides/reliability/dead-letter) | Capture and replay exhausted jobs | +| [Distributed Locking](/node/guides/reliability/locks) | Mutual exclusion across workers with database-backed locks | diff --git a/sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.java b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.java new file mode 100644 index 00000000..7ca91c34 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.java @@ -0,0 +1,33 @@ +package org.byteveda.taskito.autoscale; + +import java.time.Duration; + +/** + * Tuning for the {@link Autoscaler}. + * + * @param minWorkers the floor on handler threads (must be ≥ 1) + * @param maxWorkers the ceiling on handler threads (must be ≥ {@code minWorkers}) + * @param tasksPerWorker desired outstanding tasks per worker thread (must be ≥ 1) + * @param interval how often to re-evaluate (must be positive) + */ +public record AutoscaleOptions(int minWorkers, int maxWorkers, int tasksPerWorker, Duration interval) { + public AutoscaleOptions { + if (minWorkers < 1) { + throw new IllegalArgumentException("minWorkers must be >= 1"); + } + if (maxWorkers < minWorkers) { + throw new IllegalArgumentException("maxWorkers must be >= minWorkers"); + } + if (tasksPerWorker < 1) { + throw new IllegalArgumentException("tasksPerWorker must be >= 1"); + } + if (interval == null || interval.isNegative() || interval.isZero()) { + throw new IllegalArgumentException("interval must be positive"); + } + } + + /** Defaults: scale {@code min..max} threads, ~10 tasks per worker, re-evaluated every 2s. */ + public static AutoscaleOptions of(int minWorkers, int maxWorkers) { + return new AutoscaleOptions(minWorkers, maxWorkers, 10, Duration.ofSeconds(2)); + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java new file mode 100644 index 00000000..a77f878b --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java @@ -0,0 +1,83 @@ +package org.byteveda.taskito.autoscale; + +import java.lang.System.Logger; +import java.lang.System.Logger.Level; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +/** + * Periodically resizes a worker's handler pool to {@code ceil(depth / + * tasksPerWorker)} clamped to {@code [minWorkers, maxWorkers]}, where depth is + * the queue's outstanding work. Growing raises the pool ceiling before its core; + * shrinking lowers the core and lets idle threads time out — running handlers are + * never interrupted. + */ +public final class Autoscaler implements AutoCloseable { + private static final Logger LOG = System.getLogger(Autoscaler.class.getName()); + + private final ThreadPoolExecutor pool; + private final LongSupplier depth; + private final AutoscaleOptions options; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Autoscaler::daemon); + + public Autoscaler(ThreadPoolExecutor pool, LongSupplier depth, AutoscaleOptions options) { + this.pool = pool; + this.depth = depth; + this.options = options; + // Let core threads retire so the pool actually shrinks when idle. + pool.allowCoreThreadTimeOut(true); + } + + /** Begin periodic resizing. */ + public void start() { + long ms = options.interval().toMillis(); + scheduler.scheduleAtFixedRate(this::tickSafely, ms, ms, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + scheduler.shutdownNow(); + } + + /** The target pool size for {@code depth} under {@code options}. */ + public static int desiredSize(long depth, AutoscaleOptions options) { + long perWorker = options.tasksPerWorker(); + long target = (depth + perWorker - 1) / perWorker; // ceil(depth / perWorker) + return (int) Math.max(options.minWorkers(), Math.min(options.maxWorkers(), target)); + } + + private void tickSafely() { + try { + tick(); + } catch (Throwable e) { + // scheduleAtFixedRate cancels all future runs if a task escapes with + // any Throwable — catch everything and log so the loop survives a bad tick. + LOG.log(Level.WARNING, "autoscaler tick failed; retrying next interval", e); + } + } + + /** Read depth once and resize the pool. Package-visible for tests. */ + void tick() { + resize(desiredSize(depth.getAsLong(), options)); + } + + private void resize(int target) { + // setCorePoolSize fails if it exceeds the max, so order the two writes. + if (target > pool.getMaximumPoolSize()) { + pool.setMaximumPoolSize(target); + pool.setCorePoolSize(target); + } else { + pool.setCorePoolSize(target); + pool.setMaximumPoolSize(target); + } + } + + private static Thread daemon(Runnable runnable) { + Thread thread = new Thread(runnable, "taskito-autoscaler"); + thread.setDaemon(true); + return thread; + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.java b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.java new file mode 100644 index 00000000..c422551e --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.java @@ -0,0 +1,8 @@ +/** + * In-process autoscaling of a worker's handler thread pool. An + * {@link org.byteveda.taskito.autoscale.Autoscaler} periodically reads queue + * depth and resizes a {@link java.util.concurrent.ThreadPoolExecutor} between a + * min and max, so a worker grows under load and shrinks when idle. Enable it via + * {@code Worker.Builder.autoscale(...)}. + */ +package org.byteveda.taskito.autoscale; diff --git a/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java b/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java index 57255504..2e1f966a 100644 --- a/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java +++ b/sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java @@ -1,5 +1,7 @@ package org.byteveda.taskito.worker; +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.Arrays; @@ -11,8 +13,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import org.byteveda.taskito.autoscale.AutoscaleOptions; +import org.byteveda.taskito.autoscale.Autoscaler; import org.byteveda.taskito.errors.SerializationException; import org.byteveda.taskito.errors.WorkflowException; import org.byteveda.taskito.events.Emitter; @@ -41,15 +47,21 @@ public final class Worker implements AutoCloseable { private final ExecutorService executor; private final WorkflowTracker tracker; private final ResourceRuntime resources; + private final Autoscaler autoscaler; private final CountDownLatch shutdown = new CountDownLatch(1); private boolean closed; private Worker( - WorkerControl control, ExecutorService executor, WorkflowTracker tracker, ResourceRuntime resources) { + WorkerControl control, + ExecutorService executor, + WorkflowTracker tracker, + ResourceRuntime resources, + Autoscaler autoscaler) { this.control = control; this.executor = executor; this.tracker = tracker; this.resources = resources; + this.autoscaler = autoscaler; } public static Builder builder(QueueBackend backend, Serializer serializer, List middleware) { @@ -104,6 +116,9 @@ public synchronized void close() { return; } closed = true; + if (autoscaler != null) { + autoscaler.close(); // stop resizing before we tear the pool down + } control.stop(); // stop scheduling new work executor.shutdown(); // stop accepting; let running handlers finish try { @@ -138,6 +153,7 @@ public static final class Builder { private Integer channelCapacity; private Integer batchSize; private WorkflowTracker tracker; + private AutoscaleOptions autoscale; Builder(QueueBackend backend, Serializer serializer, List middleware, ResourceRuntime resources) { this.backend = backend; @@ -206,6 +222,16 @@ public Builder batchSize(int batchSize) { return this; } + /** + * Autoscale the handler pool between {@code min} and {@code max} threads + * based on queue depth. Replaces the fixed/cached pool with a resizable + * one driven by an {@link Autoscaler}. + */ + public Builder autoscale(AutoscaleOptions options) { + this.autoscale = options; + return this; + } + public Builder on(EventName name, Consumer listener) { listeners.computeIfAbsent(name, key -> new ArrayList<>()).add(listener); return this; @@ -240,8 +266,21 @@ private WorkflowTracker ensureTracker() { } public Worker start() { - ExecutorService executor = - concurrency > 0 ? Executors.newFixedThreadPool(concurrency) : Executors.newCachedThreadPool(); + Autoscaler scaler = null; + ExecutorService executor; + if (autoscale != null) { + ThreadPoolExecutor pool = new ThreadPoolExecutor( + autoscale.minWorkers(), + autoscale.maxWorkers(), + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>()); + scaler = new Autoscaler(pool, this::currentDepth, autoscale); + executor = pool; + } else { + executor = + concurrency > 0 ? Executors.newFixedThreadPool(concurrency) : Executors.newCachedThreadPool(); + } Emitter emitter = new Emitter(); listeners.forEach((name, bound) -> bound.forEach(listener -> emitter.on(name, listener))); WorkerDispatchBridge bridge = @@ -250,7 +289,55 @@ public Worker start() { bridge.bind(control); // Lease worker resources only after the native worker started cleanly. resources.acquireWorker(); - return new Worker(control, executor, tracker, resources); + if (scaler != null) { + scaler.start(); + } + return new Worker(control, executor, tracker, resources, scaler); + } + + /** + * Outstanding work (pending + running) this worker can actually consume. + * Scoped to the configured {@code queues} so backlog on queues this worker + * doesn't serve can't drive it to {@code maxWorkers}; unscoped otherwise. + */ + private long currentDepth() { + if (queues == null || queues.isEmpty()) { + return depthFrom(backend.statsJson()); + } + long total = 0; + for (String queue : queues) { + total += depthFrom(backend.statsByQueueJson(queue)); + } + return total; + } + + /** + * Depth from one stats blob. Propagates on failure rather than reporting + * {@code 0}: the autoscaler swallows a tick error and retries, whereas a + * false-empty reading would shrink the pool while backlog still exists. + */ + private static long depthFrom(String statsJson) { + JsonNode stats; + try { + stats = JSON.readTree(statsJson); + } catch (JacksonException e) { + throw new IllegalStateException("failed to parse worker queue stats", e); + } + return numericField(stats, "pending") + numericField(stats, "running"); + } + + /** + * Read an integral stats field, rejecting a missing or non-numeric value + * instead of coercing it to {@code 0} (which would read as an empty queue + * and shrink the pool). The autoscaler swallows the resulting error and + * retries on the next tick. + */ + private static long numericField(JsonNode stats, String name) { + JsonNode node = stats.get(name); + if (node == null || !node.canConvertToLong()) { + throw new IllegalStateException("worker queue stats field '" + name + "' is missing or non-numeric"); + } + return node.asLong(); } private String encodeOptions() { diff --git a/sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java b/sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java new file mode 100644 index 00000000..45586a94 --- /dev/null +++ b/sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java @@ -0,0 +1,50 @@ +package org.byteveda.taskito.autoscale; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Duration; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; + +class AutoscalerTest { + + private static final AutoscaleOptions OPTS = new AutoscaleOptions(1, 5, 10, Duration.ofSeconds(60)); + + @Test + void desiredSizeClampsToBounds() { + assertEquals(1, Autoscaler.desiredSize(0, OPTS)); // empty → floor + assertEquals(1, Autoscaler.desiredSize(5, OPTS)); // ceil(0.5) → floor + assertEquals(3, Autoscaler.desiredSize(25, OPTS)); // ceil(2.5) + assertEquals(5, Autoscaler.desiredSize(1000, OPTS)); // clamp to ceiling + } + + @Test + void tickResizesPoolToDepth() { + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + AtomicLong depth = new AtomicLong(0); + try (Autoscaler scaler = new Autoscaler(pool, depth::get, OPTS)) { + depth.set(30); + scaler.tick(); + assertEquals(3, pool.getCorePoolSize()); + assertEquals(3, pool.getMaximumPoolSize()); + + depth.set(0); + scaler.tick(); + assertEquals(1, pool.getCorePoolSize()); + } finally { + pool.shutdown(); + } + } + + @Test + void rejectsInvalidOptions() { + assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(0, 5, 10, Duration.ofSeconds(1))); + assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(3, 1, 10, Duration.ofSeconds(1))); + assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(1, 5, 0, Duration.ofSeconds(1))); + assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(1, 5, 10, Duration.ZERO)); + } +}