Skip to content

feat(java): producer-side batching#344

Merged
pratyush618 merged 5 commits into
masterfrom
feat/java-batching
Jul 1, 2026
Merged

feat(java): producer-side batching#344
pratyush618 merged 5 commits into
masterfrom
feat/java-batching

Conversation

@pratyush618

@pratyush618 pratyush618 commented Jul 1, 2026

Copy link
Copy Markdown
Collaborator

What

Adds Batcher<T> — a producer-side accumulator that coalesces many single enqueues into one enqueueMany call, cutting per-job round-trips for high-fan-out producers.

  • Batcher<T> (org.byteveda.taskito.batch) — buffers payloads for one task; flushes when the buffer hits maxBatch or maxDelay elapses since the first buffered item (whichever comes first).
    • add(payload) returns the job ids if that call triggered a size-flush, else empty.
    • flush() drains now; close() flushes remaining then stops the timer (try-with-resources friendly).
    • Thread-safe under a single lock; a daemon ScheduledExecutorService drives the time-based flush.
    • Validates maxBatch > 0 and maxDelay positive.

Backed by the existing enqueueMany path (which preflights predicates/gates), so batching inherits the same gating semantics as individual enqueues.

Test

BatcherTest — size-triggered flush, time-triggered flush, close() flush-on-remaining, empty flush no-op.

./gradlew build green (JDK 17 build leg + 21/25 test legs).

Summary by CodeRabbit

  • New Features

    • Added a Java batching helper that groups task payloads and enqueues them in bulk.
    • Automatically flushes when the batch is full or after a configured delay, and supports clean shutdown with a final flush.
  • Documentation

    • Added package-level guidance explaining producer-side batching behavior and flush timing.
  • Tests

    • Added JUnit coverage for capacity-based flushing, time-based flushing, and flushing remaining items on close.

Batcher buffers payloads for one task and flushes them in a single
enqueueMany when the batch fills or a delay elapses. The worker side
already batches via the worker batchSize option.
Flush on full, on close, and after the delay.
@coderabbitai

coderabbitai Bot commented Jul 1, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9e92f367-6fd5-48c5-8d2d-7e1d818dcda5

📥 Commits

Reviewing files that changed from the base of the PR and between 1ca1343 and a2cfa96.

📒 Files selected for processing (2)
  • sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java
  • sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java
🚧 Files skipped from review as they are similar to previous changes (1)
  • sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java

📝 Walkthrough

Walkthrough

This PR adds a new Batcher<T> class to the Java SDK for producer-side batching, buffering task payloads and flushing them via enqueueMany based on size or time thresholds. It includes package-level documentation and a JUnit test suite covering flush-on-full, flush-on-close, reject-after-close, and delay-based flush behaviors.

Changes

Producer-side Batching

Layer / File(s) Summary
Batcher class structure and construction
sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java
Declares the Batcher<T> class with fields for queue/task, size/delay limits, scheduler, lock, and buffer; constructor validates maxBatch and maxDelay; static of(...) factory method added.
Add, flush, close, and scheduling logic
sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java
Implements add() to buffer and trigger size-based flush, flush() and close() for manual/final flushes, internal flushLocked() to enqueue via enqueueMany, scheduleFlush() for delayed flush scheduling, and a daemon thread factory for the scheduler.
Package documentation and tests
sdks/java/src/main/java/org/byteveda/taskito/batch/package-info.java, sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java
Adds Javadoc contrasting producer-side vs worker-side batching, and tests verifying flush-on-full, flush-on-close, reject-after-close, and flush-after-delay behaviors using a temporary database and polling assertions.

Estimated code review effort: 3 (Moderate) | ~25 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 18.18% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: adding producer-side batching in the Java SDK.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/java-batching

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java`:
- Around line 52-59: Serialize Batcher.close() with Batcher.add() by guarding
both paths with the same lock and a closed flag. In add(T payload), check the
closed state before buffering or calling scheduleFlush() and reject/ignore new
additions after close; in close(), set closed while holding the lock, flush
under that lock, then shut down the executor so no new delayed flush can be
scheduled in between. Make sure flushLocked(), scheduleFlush(), and close() all
respect pendingFlush consistently so shutdown cannot strand buffered payloads or
trigger RejectedExecutionException after closure.
- Around line 76-86: The flushLocked() path in Batcher clears buffer before
queue.enqueueMany(task, batch) completes, so a failure can drop pending items;
change the flow to keep the buffered entries until enqueueMany succeeds, or
catch failures and restore/reschedule the batch before returning. Use the
flushLocked() method and the buffer/pendingFlush state to ensure delayed flushes
don’t lose data if queue.enqueueMany throws.
- Around line 35-41: In Batcher’s constructor, maxDelay currently passes
validation but maxDelay.toMillis() truncates any positive sub-millisecond
duration to 0, which makes the scheduler flush immediately. Update the maxDelay
handling so Batcher's delay is either rejected when it is below 1 ms or
preserved at higher precision by storing and scheduling in nanoseconds instead
of milliseconds. Make the change where maxDelayMs is assigned and wherever
Batcher schedules the flush task, so the behavior stays consistent with the
maxDelay contract.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 102803c5-ce9a-4811-ae6d-629ddc8a856c

📥 Commits

Reviewing files that changed from the base of the PR and between 8438dcd and 1ca1343.

📒 Files selected for processing (3)
  • sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java
  • sdks/java/src/main/java/org/byteveda/taskito/batch/package-info.java
  • sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java

Comment thread sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java Outdated
Comment thread sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java
Comment thread sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java Outdated
@pratyush618 pratyush618 merged commit bf40759 into master Jul 1, 2026
19 checks passed
@pratyush618 pratyush618 deleted the feat/java-batching branch July 1, 2026 04:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant