feat(java): producer-side batching#344
Conversation
Batcher buffers payloads for one task and flushes them in a single enqueueMany when the batch fills or a delay elapses. The worker side already batches via the worker batchSize option.
Flush on full, on close, and after the delay.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR adds a new ChangesProducer-side Batching
Estimated code review effort: 3 (Moderate) | ~25 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java`:
- Around line 52-59: Serialize Batcher.close() with Batcher.add() by guarding
both paths with the same lock and a closed flag. In add(T payload), check the
closed state before buffering or calling scheduleFlush() and reject/ignore new
additions after close; in close(), set closed while holding the lock, flush
under that lock, then shut down the executor so no new delayed flush can be
scheduled in between. Make sure flushLocked(), scheduleFlush(), and close() all
respect pendingFlush consistently so shutdown cannot strand buffered payloads or
trigger RejectedExecutionException after closure.
- Around line 76-86: The flushLocked() path in Batcher clears buffer before
queue.enqueueMany(task, batch) completes, so a failure can drop pending items;
change the flow to keep the buffered entries until enqueueMany succeeds, or
catch failures and restore/reschedule the batch before returning. Use the
flushLocked() method and the buffer/pendingFlush state to ensure delayed flushes
don’t lose data if queue.enqueueMany throws.
- Around line 35-41: In Batcher’s constructor, maxDelay currently passes
validation but maxDelay.toMillis() truncates any positive sub-millisecond
duration to 0, which makes the scheduler flush immediately. Update the maxDelay
handling so Batcher's delay is either rejected when it is below 1 ms or
preserved at higher precision by storing and scheduling in nanoseconds instead
of milliseconds. Make the change where maxDelayMs is assigned and wherever
Batcher schedules the flush task, so the behavior stays consistent with the
maxDelay contract.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 102803c5-ce9a-4811-ae6d-629ddc8a856c
📒 Files selected for processing (3)
sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.javasdks/java/src/main/java/org/byteveda/taskito/batch/package-info.javasdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java
What
Adds
Batcher<T>— a producer-side accumulator that coalesces many single enqueues into oneenqueueManycall, cutting per-job round-trips for high-fan-out producers.Batcher<T>(org.byteveda.taskito.batch) — buffers payloads for one task; flushes when the buffer hitsmaxBatchormaxDelayelapses since the first buffered item (whichever comes first).add(payload)returns the job ids if that call triggered a size-flush, else empty.flush()drains now;close()flushes remaining then stops the timer (try-with-resources friendly).ScheduledExecutorServicedrives the time-based flush.maxBatch > 0andmaxDelaypositive.Backed by the existing
enqueueManypath (which preflights predicates/gates), so batching inherits the same gating semantics as individual enqueues.Test
BatcherTest— size-triggered flush, time-triggered flush,close()flush-on-remaining, empty flush no-op../gradlew buildgreen (JDK 17 build leg + 21/25 test legs).Summary by CodeRabbit
New Features
Documentation
Tests