-
Notifications
You must be signed in to change notification settings - Fork 0
feat(java): in-process worker autoscaling #345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+422
−5
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
75945fb
feat(java): in-process worker autoscaling
pratyush618 f166902
test(java): cover the autoscaler
pratyush618 ac3c5a9
fix(java): autoscaler tick catches Throwable and logs
pratyush618 a7a00c9
fix(java): propagate worker depth read failure
pratyush618 53e0eb5
fix(java): scope autoscale depth to worker queues
pratyush618 bc378dc
docs(node): add missing guides landing pages
pratyush618 60fc54a
fix(java): reject non-numeric worker stats fields
pratyush618 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| --- | ||
| title: Core | ||
| description: "The building blocks of every Node.js taskito application." | ||
| --- | ||
|
|
||
| The building blocks of every taskito application. | ||
|
|
||
| | Guide | Description | | ||
| |---|---| | ||
| | [Tasks](/node/guides/core/tasks) | Register tasks with `queue.task()`, configure retries, timeouts, and options | | ||
| | [Queues & Priority](/node/guides/core/queues) | Named queues, priority levels, and routing | | ||
| | [Workers](/node/guides/core/workers) | Start workers, control concurrency, graceful shutdown | | ||
| | [Execution Models](/node/guides/core/execution-model) | How tasks move from enqueue to completion | | ||
| | [Enqueue options](/node/guides/core/enqueue-options) | Per-job delay, priority, uniqueness, and metadata | | ||
| | [Predicates](/node/guides/core/predicates) | Gate an enqueue on a runtime condition | | ||
| | [Scheduling](/node/guides/core/scheduling) | Periodic tasks with cron expressions | | ||
| | [Cancellation](/node/guides/core/cancellation) | Cancel queued and in-flight jobs | | ||
| | [Streaming partial results](/node/guides/core/streaming) | Emit progress from a running task | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| --- | ||
| title: Extensibility | ||
| description: "Extend the Node.js SDK at every stage of the task lifecycle." | ||
| --- | ||
|
|
||
| Extend taskito with custom behavior at every stage of the task lifecycle. | ||
|
|
||
| | Guide | Description | | ||
| |---|---| | ||
| | [Events](/node/guides/extensibility/events) | Subscribe to queue and worker lifecycle events | | ||
| | [Per-Task Middleware](/node/guides/extensibility/middleware) | Wrap task execution with before/after/error hooks | | ||
| | [Webhooks](/node/guides/extensibility/webhooks) | Push signed notifications to external services | | ||
| | [Pluggable Serializers](/node/guides/extensibility/serializers) | Custom payload serialization — msgpack, encryption | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| --- | ||
| title: Guides | ||
| description: Topical guides covering the Node.js SDK's surface area. | ||
| --- | ||
|
|
||
| import { Cards, Card } from "fumadocs-ui/components/card"; | ||
| import { | ||
| Boxes, | ||
| ShieldCheck, | ||
| Wrench, | ||
| Activity, | ||
| Layers, | ||
| Workflow, | ||
| Plug, | ||
| Puzzle, | ||
| } from "lucide-react"; | ||
|
|
||
| Pick the topic you're working through. Each guide is self-contained, with code | ||
| samples, gotchas, and links to the relevant API reference. | ||
|
|
||
| <Cards> | ||
| <Card | ||
| icon={<Boxes />} | ||
| title="Core" | ||
| href="/node/guides/core" | ||
| description="Define tasks, route to queues, run workers, schedule periodics." | ||
| /> | ||
| <Card | ||
| icon={<ShieldCheck />} | ||
| title="Reliability" | ||
| href="/node/guides/reliability" | ||
| description="Retries, dead letters, circuit breakers, idempotency, error handling." | ||
| /> | ||
| <Card | ||
| icon={<Workflow />} | ||
| title="Workflows" | ||
| href="/node/guides/workflows" | ||
| description="DAGs, fan-out, fan-in, conditions, gates, sub-workflows, saga compensation." | ||
| /> | ||
| <Card | ||
| icon={<Puzzle />} | ||
| title="Extensibility" | ||
| href="/node/guides/extensibility" | ||
| description="Custom serializers, middleware, events, webhooks." | ||
| /> | ||
| <Card | ||
| icon={<Layers />} | ||
| title="Resources" | ||
| href="/node/guides/resources" | ||
| description="Inject DB clients, HTTP sessions, and cloud SDKs by name; intercept enqueues." | ||
| /> | ||
| <Card | ||
| icon={<Plug />} | ||
| title="Integrations" | ||
| href="/node/guides/integrations" | ||
| description="Observability and web-framework helpers for the Node.js SDK." | ||
| /> | ||
| <Card | ||
| icon={<Wrench />} | ||
| title="Operations" | ||
| href="/node/guides/operations" | ||
| description="Deployment, scaling, mesh, KEDA, migration from BullMQ." | ||
| /> | ||
| <Card | ||
| icon={<Activity />} | ||
| title="Observability" | ||
| href="/node/guides/observability" | ||
| description="Monitor queue health, the dashboard, and structured logs." | ||
| /> | ||
| </Cards> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,16 @@ | ||
| { "title": "Guides", "root": true, "pages": ["core", "reliability", "workflows", "extensibility", "resources", "integrations", "operations", "observability"] } | ||
| { | ||
| "title": "Guides", | ||
| "root": true, | ||
| "pages": [ | ||
| "index", | ||
| "core", | ||
| "reliability", | ||
| "workflows", | ||
| "extensibility", | ||
| "resources", | ||
| "integrations", | ||
| "operations", | ||
| "observability" | ||
| ] | ||
| } | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| --- | ||
| title: Operations | ||
| description: "Run the Node.js SDK reliably in production — deployment, scaling, migration." | ||
| --- | ||
|
|
||
| Run taskito reliably in production. | ||
|
|
||
| | Guide | Description | | ||
| |---|---| | ||
| | [Backends](/node/guides/operations/backends) | SQLite, Postgres, and Redis — setup and trade-offs | | ||
| | [Job Management](/node/guides/operations/inspection) | Inspect, cancel, replay, and clean up jobs | | ||
| | [Dashboard](/node/guides/operations/dashboard) | Serve the web dashboard from a Node worker | | ||
| | [Dashboard REST API](/node/guides/operations/dashboard-api) | The JSON API behind the dashboard | | ||
| | [Mesh Scheduling](/node/guides/operations/mesh) | Decentralized work-stealing across worker nodes | | ||
| | [KEDA Autoscaling](/node/guides/operations/keda) | Kubernetes event-driven autoscaling from queue depth | | ||
| | [CLI](/node/guides/operations/cli) | Manage queues and workers from the command line | | ||
| | [Testing](/node/guides/operations/testing) | Test mode, fixtures, and mocking | | ||
| | [Security](/node/guides/operations/security) | Signing, encryption, and dashboard auth | | ||
| | [Troubleshooting](/node/guides/operations/troubleshooting) | Diagnose stuck jobs and worker issues | | ||
| | [Deployment](/node/guides/operations/deployment) | Docker, process managers, and production checklists | | ||
| | [Migrating from BullMQ](/node/guides/operations/migration) | Side-by-side comparison and migration guide | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| --- | ||
| title: Reliability | ||
| description: "Harden your Node.js task queue for production workloads." | ||
| --- | ||
|
|
||
| Harden your task queue for production workloads. | ||
|
|
||
| | Guide | Description | | ||
| |---|---| | ||
| | [Error Handling](/node/guides/reliability/error-handling) | Task failure lifecycle, error inspection, debugging patterns | | ||
| | [Delivery Guarantees](/node/guides/reliability/guarantees) | At-least-once delivery and exactly-once patterns | | ||
| | [Retries](/node/guides/reliability/retries) | Automatic retries with exponential backoff | | ||
| | [Timeouts](/node/guides/reliability/timeouts) | Bound task runtime and reclaim stalled jobs | | ||
| | [Idempotency](/node/guides/reliability/idempotency) | Deduplicate enqueues with unique keys | | ||
| | [Rate Limiting](/node/guides/reliability/rate-limiting) | Throttle task execution with token-bucket limits | | ||
| | [Concurrency](/node/guides/reliability/concurrency) | Cap how many jobs of a task run at once | | ||
| | [Circuit Breakers](/node/guides/reliability/circuit-breakers) | Protect downstream services from cascading failures | | ||
| | [Dead-letter queue](/node/guides/reliability/dead-letter) | Capture and replay exhausted jobs | | ||
| | [Distributed Locking](/node/guides/reliability/locks) | Mutual exclusion across workers with database-backed locks | |
33 changes: 33 additions & 0 deletions
33
sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| package org.byteveda.taskito.autoscale; | ||
|
|
||
| import java.time.Duration; | ||
|
|
||
| /** | ||
| * Tuning for the {@link Autoscaler}. | ||
| * | ||
| * @param minWorkers the floor on handler threads (must be ≥ 1) | ||
| * @param maxWorkers the ceiling on handler threads (must be ≥ {@code minWorkers}) | ||
| * @param tasksPerWorker desired outstanding tasks per worker thread (must be ≥ 1) | ||
| * @param interval how often to re-evaluate (must be positive) | ||
| */ | ||
| public record AutoscaleOptions(int minWorkers, int maxWorkers, int tasksPerWorker, Duration interval) { | ||
| public AutoscaleOptions { | ||
| if (minWorkers < 1) { | ||
| throw new IllegalArgumentException("minWorkers must be >= 1"); | ||
| } | ||
| if (maxWorkers < minWorkers) { | ||
| throw new IllegalArgumentException("maxWorkers must be >= minWorkers"); | ||
| } | ||
| if (tasksPerWorker < 1) { | ||
| throw new IllegalArgumentException("tasksPerWorker must be >= 1"); | ||
| } | ||
| if (interval == null || interval.isNegative() || interval.isZero()) { | ||
| throw new IllegalArgumentException("interval must be positive"); | ||
| } | ||
| } | ||
|
|
||
| /** Defaults: scale {@code min..max} threads, ~10 tasks per worker, re-evaluated every 2s. */ | ||
| public static AutoscaleOptions of(int minWorkers, int maxWorkers) { | ||
| return new AutoscaleOptions(minWorkers, maxWorkers, 10, Duration.ofSeconds(2)); | ||
| } | ||
| } |
83 changes: 83 additions & 0 deletions
83
sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package org.byteveda.taskito.autoscale; | ||
|
|
||
| import java.lang.System.Logger; | ||
| import java.lang.System.Logger.Level; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.function.LongSupplier; | ||
|
|
||
| /** | ||
| * Periodically resizes a worker's handler pool to {@code ceil(depth / | ||
| * tasksPerWorker)} clamped to {@code [minWorkers, maxWorkers]}, where depth is | ||
| * the queue's outstanding work. Growing raises the pool ceiling before its core; | ||
| * shrinking lowers the core and lets idle threads time out — running handlers are | ||
| * never interrupted. | ||
| */ | ||
| public final class Autoscaler implements AutoCloseable { | ||
| private static final Logger LOG = System.getLogger(Autoscaler.class.getName()); | ||
|
|
||
| private final ThreadPoolExecutor pool; | ||
| private final LongSupplier depth; | ||
| private final AutoscaleOptions options; | ||
| private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Autoscaler::daemon); | ||
|
|
||
| public Autoscaler(ThreadPoolExecutor pool, LongSupplier depth, AutoscaleOptions options) { | ||
| this.pool = pool; | ||
| this.depth = depth; | ||
| this.options = options; | ||
| // Let core threads retire so the pool actually shrinks when idle. | ||
| pool.allowCoreThreadTimeOut(true); | ||
| } | ||
|
|
||
| /** Begin periodic resizing. */ | ||
| public void start() { | ||
| long ms = options.interval().toMillis(); | ||
| scheduler.scheduleAtFixedRate(this::tickSafely, ms, ms, TimeUnit.MILLISECONDS); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| scheduler.shutdownNow(); | ||
| } | ||
|
|
||
| /** The target pool size for {@code depth} under {@code options}. */ | ||
| public static int desiredSize(long depth, AutoscaleOptions options) { | ||
| long perWorker = options.tasksPerWorker(); | ||
| long target = (depth + perWorker - 1) / perWorker; // ceil(depth / perWorker) | ||
| return (int) Math.max(options.minWorkers(), Math.min(options.maxWorkers(), target)); | ||
| } | ||
|
|
||
| private void tickSafely() { | ||
| try { | ||
| tick(); | ||
| } catch (Throwable e) { | ||
| // scheduleAtFixedRate cancels all future runs if a task escapes with | ||
| // any Throwable — catch everything and log so the loop survives a bad tick. | ||
| LOG.log(Level.WARNING, "autoscaler tick failed; retrying next interval", e); | ||
| } | ||
| } | ||
|
|
||
| /** Read depth once and resize the pool. Package-visible for tests. */ | ||
| void tick() { | ||
| resize(desiredSize(depth.getAsLong(), options)); | ||
| } | ||
|
|
||
| private void resize(int target) { | ||
| // setCorePoolSize fails if it exceeds the max, so order the two writes. | ||
| if (target > pool.getMaximumPoolSize()) { | ||
| pool.setMaximumPoolSize(target); | ||
| pool.setCorePoolSize(target); | ||
| } else { | ||
| pool.setCorePoolSize(target); | ||
| pool.setMaximumPoolSize(target); | ||
| } | ||
| } | ||
|
|
||
| private static Thread daemon(Runnable runnable) { | ||
| Thread thread = new Thread(runnable, "taskito-autoscaler"); | ||
| thread.setDaemon(true); | ||
| return thread; | ||
| } | ||
| } | ||
8 changes: 8 additions & 0 deletions
8
sdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| /** | ||
| * In-process autoscaling of a worker's handler thread pool. An | ||
| * {@link org.byteveda.taskito.autoscale.Autoscaler} periodically reads queue | ||
| * depth and resizes a {@link java.util.concurrent.ThreadPoolExecutor} between a | ||
| * min and max, so a worker grows under load and shrinks when idle. Enable it via | ||
| * {@code Worker.Builder.autoscale(...)}. | ||
| */ | ||
| package org.byteveda.taskito.autoscale; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.