feat(java): in-process worker autoscaling#345
Conversation
Worker.Builder.autoscale(min,max) runs a resizable ThreadPoolExecutor; an Autoscaler resizes it to ceil(depth/tasksPerWorker) on an interval, growing under load and shrinking when idle (running handlers keep going).
desiredSize clamping, tick-driven resize, option validation.
|
@CodeRabbit review |
✅ Action performedReview finished.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (4)
sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java (1)
30-34: 🎯 Functional Correctness | 🔵 Trivial | ⚡ Quick winNo initial resize until the first interval elapses.
scheduleAtFixedRate(this::tickSafely, ms, ms, ...)usesmsas both the initial delay and the period, so the pool keeps whatever size it was constructed with for a fullintervalbefore the first tick runs. If the initial pool size doesn't match the computed bound (e.g., constructed withminWorkerscore/max but current load already needs more), the worker under-scales for that first window.Consider ticking once immediately on
start():💡 Proposed fix
public void start() { long ms = options.interval().toMillis(); - scheduler.scheduleAtFixedRate(this::tickSafely, ms, ms, TimeUnit.MILLISECONDS); + scheduler.scheduleAtFixedRate(this::tickSafely, 0, ms, TimeUnit.MILLISECONDS); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java` around lines 30 - 34, The Autoscaler.start() method currently waits one full interval before the first resize because scheduleAtFixedRate(this::tickSafely, ms, ms, ...) uses the same delay and period; change it so the first scaling decision happens immediately when start() is called, then continue scheduling periodic ticks using options.interval() through the existing tickSafely path.sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java (3)
43-49: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueConsider covering null/negative interval validation cases.
Per
AutoscaleOptions.java:13-27, the constructor also rejects anullinterval and negative durations, not justDuration.ZERO. Adding these cases would round out validation coverage.🧪 Optional additional cases
assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(1, 5, 10, Duration.ZERO)); + assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(1, 5, 10, Duration.ofSeconds(-1))); + assertThrows(IllegalArgumentException.class, () -> new AutoscaleOptions(1, 5, 10, null));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java` around lines 43 - 49, The AutoscaleOptions validation test currently covers only zero duration and misses the null/negative interval cases rejected by the constructor. Update AutoscalerTest.rejectsInvalidOptions to add assertions for a null interval and a negative Duration when constructing AutoscaleOptions, using the existing reject-invalid-options pattern so the coverage matches the validation in AutoscaleOptions.
25-41: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚖️ Poor tradeoffNo test for
close()stopping the scheduled loop.The PR objectives note the autoscaler uses a daemon scheduler, stopped via
close(). This test only exercises manualtick()calls and doesn't verify that scheduled ticking actually stops afterclose()(e.g., via a mock/scheduled executor or timing-based check). Given this is a lifecycle guarantee highlighted in the PR, a follow-up test would add confidence, though it may require more setup (e.g., an injectable/mock scheduler) than is warranted here.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java` around lines 25 - 41, Add a lifecycle test for Autoscaler that verifies close() stops the scheduled ticking loop, since tickResizesPoolToDepth only covers manual tick() behavior. Use the Autoscaler class and its daemon scheduler path to assert that after close() no further scheduled ticks occur, preferably by injecting or mocking the scheduler if needed, or by using a small timing-based check around close().
35-37: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winShrink assertion misses
maximumPoolSizeverification.Per Autoscaler.resize() (see
Autoscaler.java:56-70), when shrinking, bothsetCorePoolSizeandsetMaximumPoolSizeare updated (core first, then max). This test only assertsgetCorePoolSize()after the shrink tick, leaving the max-pool-size shrink path unverified.✅ Proposed addition
depth.set(0); scaler.tick(); assertEquals(1, pool.getCorePoolSize()); + assertEquals(1, pool.getMaximumPoolSize());🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java` around lines 35 - 37, The shrink test in AutoscalerTest only verifies pool.getCorePoolSize() after scaler.tick(), so the maximumPoolSize shrink path in Autoscaler.resize() is untested. Update the assertion block around scaler.tick() to also check pool.getMaximumPoolSize() after a shrink, using the existing AutoscalerTest and pool setup so both core and max size updates are covered.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java`:
- Around line 48-54: The Autoscaler tick loop in tickSafely currently swallows
only RuntimeException, so any Error escaping tick() can permanently stop
scheduleAtFixedRate from running future executions. Update tickSafely to catch
Throwable (or otherwise ensure all failures from tick/depth/getAsLong and resize
operations are contained), and add a minimal diagnostic log in the catch so
swallowed failures are visible to operators. Use the tickSafely and tick methods
in Autoscaler as the fix points.
In `@sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java`:
- Around line 297-301: The autoscaling depth calculation in currentDepth() is
using aggregate backend.statsJson() pending/running totals, which can include
work from queues this Worker does not consume. Update Worker.currentDepth() to
scope depth to the configured queues() from the worker options, either by
reading per-queue stats or filtering statsJson() by the worker’s queue list
before summing backlog. Keep the logic localized to currentDepth() so maxWorkers
is driven only by work this worker can actually process.
- Around line 299-304: The stats supplier in Worker should not swallow
backend.statsJson()/JSON.readTree failures and return 0, because that makes a
transient read or schema error look like an empty queue. Update the lambda
around the statsJson parsing in Worker so it only returns the computed
pending+running depth on success and otherwise lets the exception propagate,
allowing the autoscaler tick to fail/skip instead of shrinking incorrectly.
---
Nitpick comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java`:
- Around line 30-34: The Autoscaler.start() method currently waits one full
interval before the first resize because scheduleAtFixedRate(this::tickSafely,
ms, ms, ...) uses the same delay and period; change it so the first scaling
decision happens immediately when start() is called, then continue scheduling
periodic ticks using options.interval() through the existing tickSafely path.
In `@sdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java`:
- Around line 43-49: The AutoscaleOptions validation test currently covers only
zero duration and misses the null/negative interval cases rejected by the
constructor. Update AutoscalerTest.rejectsInvalidOptions to add assertions for a
null interval and a negative Duration when constructing AutoscaleOptions, using
the existing reject-invalid-options pattern so the coverage matches the
validation in AutoscaleOptions.
- Around line 25-41: Add a lifecycle test for Autoscaler that verifies close()
stops the scheduled ticking loop, since tickResizesPoolToDepth only covers
manual tick() behavior. Use the Autoscaler class and its daemon scheduler path
to assert that after close() no further scheduled ticks occur, preferably by
injecting or mocking the scheduler if needed, or by using a small timing-based
check around close().
- Around line 35-37: The shrink test in AutoscalerTest only verifies
pool.getCorePoolSize() after scaler.tick(), so the maximumPoolSize shrink path
in Autoscaler.resize() is untested. Update the assertion block around
scaler.tick() to also check pool.getMaximumPoolSize() after a shrink, using the
existing AutoscalerTest and pool setup so both core and max size updates are
covered.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: b7f89205-4169-4b73-abf6-cec1414d9170
📒 Files selected for processing (5)
sdks/java/src/main/java/org/byteveda/taskito/autoscale/AutoscaleOptions.javasdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.javasdks/java/src/main/java/org/byteveda/taskito/autoscale/package-info.javasdks/java/src/main/java/org/byteveda/taskito/worker/Worker.javasdks/java/src/test/java/org/byteveda/taskito/autoscale/AutoscalerTest.java
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java (1)
277-277: 🎯 Functional Correctness | 🟠 Major | ⚡ Quick winSnapshot queue scope before wiring the autoscaler.
this::currentDepthkeeps reading the mutableBuilder; if the builder is reused orqueues(...)is called afterstart(), the live worker can autoscale against queues different from the options already sent tobackend.startWorker(...).Proposed fix
if (autoscale != null) { + List<String> autoscaleQueues = + queues == null ? java.util.Collections.emptyList() : new ArrayList<>(queues); ThreadPoolExecutor pool = new ThreadPoolExecutor( autoscale.minWorkers(), autoscale.maxWorkers(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); - scaler = new Autoscaler(pool, this::currentDepth, autoscale); + scaler = new Autoscaler(pool, () -> currentDepth(autoscaleQueues), autoscale); executor = pool; } else {- private long currentDepth() { - if (queues == null || queues.isEmpty()) { + private long currentDepth(List<String> scopedQueues) { + if (scopedQueues.isEmpty()) { return depthFrom(backend.statsJson()); } long total = 0; - for (String queue : queues) { + for (String queue : scopedQueues) { total += depthFrom(backend.statsByQueueJson(queue)); } return total; }Also applies to: 302-310
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java` at line 277, Snapshot the queue scope before creating the Autoscaler in Worker so autoscaling uses the same queues already passed to backend.startWorker(...). Replace the live this::currentDepth reference in the autoscaler wiring with a precomputed, immutable depth supplier/value captured from the Builder at start time, and ensure Worker.start() no longer depends on mutable Builder state after launch. Apply the same fix to the related autoscaler setup in the other affected block(s) around the autoscale wiring.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java`:
- Around line 320-321: The Worker stats aggregation in Worker.readTree logic is
treating missing or non-numeric pending/running values as 0, which can hide
malformed stats. Update the stats parsing path to validate the JSON schema
before computing depth, and in the method that sums pending plus running,
explicitly check that both fields exist and are numeric before converting; if
validation fails, handle it as an error instead of defaulting to zero.
---
Outside diff comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java`:
- Line 277: Snapshot the queue scope before creating the Autoscaler in Worker so
autoscaling uses the same queues already passed to backend.startWorker(...).
Replace the live this::currentDepth reference in the autoscaler wiring with a
precomputed, immutable depth supplier/value captured from the Builder at start
time, and ensure Worker.start() no longer depends on mutable Builder state after
launch. Apply the same fix to the related autoscaler setup in the other affected
block(s) around the autoscale wiring.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 64018755-5f88-4cae-9e37-c526d76f3f8f
📒 Files selected for processing (8)
docs/content/docs/node/guides/core/index.mdxdocs/content/docs/node/guides/extensibility/index.mdxdocs/content/docs/node/guides/index.mdxdocs/content/docs/node/guides/meta.jsondocs/content/docs/node/guides/operations/index.mdxdocs/content/docs/node/guides/reliability/index.mdxsdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.javasdks/java/src/main/java/org/byteveda/taskito/worker/Worker.java
✅ Files skipped from review due to trivial changes (6)
- docs/content/docs/node/guides/meta.json
- docs/content/docs/node/guides/core/index.mdx
- docs/content/docs/node/guides/extensibility/index.mdx
- docs/content/docs/node/guides/reliability/index.mdx
- docs/content/docs/node/guides/index.mdx
- docs/content/docs/node/guides/operations/index.mdx
🚧 Files skipped from review as they are similar to previous changes (1)
- sdks/java/src/main/java/org/byteveda/taskito/autoscale/Autoscaler.java
What
Adds
Autoscaler— an in-process controller that resizes a worker's handler pool from live queue depth, so a single JVM worker scales its concurrency to load without external orchestration.Autoscaler(org.byteveda.taskito.autoscale) — everyinterval, reads depth and resizes the worker'sThreadPoolExecutortoceil(depth / tasksPerWorker), clamped to[minWorkers, maxWorkers].setCorePoolSize > maxIllegalArgumentException). Shrink: lower core then max;allowCoreThreadTimeOut(true)lets idle threads retire. Running handlers are never interrupted.close()stops it.desiredSize(depth, options)exposed (pure, unit-tested) for the sizing math.AutoscaleOptions—minWorkers/maxWorkers/tasksPerWorker/interval, validated.Worker— exposes its handler pool to the autoscaler (resizableThreadPoolExecutor).Test
AutoscalerTest—desiredSizemath (ceil, clamps at min/max, zero depth), plus a live resize driving pool core/max up and back down../gradlew buildgreen (JDK 17 build leg + 21/25 test legs).Summary by CodeRabbit
AutoscaleOptionsconfiguration (min/max workers, tasks per worker, and interval) plus anautoscale(...)builder option.