From 0bfd9a947243a37a739afb0c59c8e88bedaade01 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 28 Jun 2026 19:25:33 +0530 Subject: [PATCH 1/5] feat(java): producer-side batching Batcher buffers payloads for one task and flushes them in a single enqueueMany when the batch fills or a delay elapses. The worker side already batches via the worker batchSize option. --- .../org/byteveda/taskito/batch/Batcher.java | 100 ++++++++++++++++++ .../byteveda/taskito/batch/package-info.java | 7 ++ 2 files changed, 107 insertions(+) create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/batch/package-info.java diff --git a/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java new file mode 100644 index 00000000..223ad29b --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java @@ -0,0 +1,100 @@ +package org.byteveda.taskito.batch; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.byteveda.taskito.Taskito; +import org.byteveda.taskito.task.Task; + +/** + * Buffers payloads for one task and enqueues them in a single + * {@code enqueueMany} call when the buffer reaches {@code maxBatch} or + * {@code maxDelay} elapses since the first buffered item. Thread-safe; + * {@link #close()} flushes what remains. Use with try-with-resources. + * + * @param the task's payload type + */ +public final class Batcher implements AutoCloseable { + private final Taskito queue; + private final Task task; + private final int maxBatch; + private final long maxDelayMs; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Batcher::daemon); + private final Object lock = new Object(); + private final List buffer = new ArrayList<>(); + private ScheduledFuture pendingFlush; + + public Batcher(Taskito queue, Task task, int maxBatch, Duration maxDelay) { + if (maxBatch <= 0) { + throw new IllegalArgumentException("maxBatch must be > 0"); + } + if (maxDelay == null || maxDelay.isNegative() || maxDelay.isZero()) { + throw new IllegalArgumentException("maxDelay must be positive"); + } + this.queue = queue; + this.task = task; + this.maxBatch = maxBatch; + this.maxDelayMs = maxDelay.toMillis(); + } + + public static Batcher of(Taskito queue, Task task, int maxBatch, Duration maxDelay) { + return new Batcher<>(queue, task, maxBatch, maxDelay); + } + + /** + * Buffer {@code payload}. Returns the job ids if this call triggered a flush + * (the buffer reached {@code maxBatch}), otherwise an empty list. + */ + public List add(T payload) { + synchronized (lock) { + buffer.add(payload); + if (buffer.size() >= maxBatch) { + return flushLocked(); + } + scheduleFlush(); + return List.of(); + } + } + + /** Enqueue any buffered payloads now; returns their job ids (empty if none). */ + public List flush() { + synchronized (lock) { + return flushLocked(); + } + } + + @Override + public void close() { + flush(); + scheduler.shutdownNow(); + } + + private List flushLocked() { + if (pendingFlush != null) { + pendingFlush.cancel(false); + pendingFlush = null; + } + if (buffer.isEmpty()) { + return List.of(); + } + List batch = new ArrayList<>(buffer); + buffer.clear(); + return queue.enqueueMany(task, batch); + } + + private void scheduleFlush() { + if (pendingFlush == null) { + pendingFlush = scheduler.schedule(this::flush, maxDelayMs, TimeUnit.MILLISECONDS); + } + } + + private static Thread daemon(Runnable runnable) { + Thread thread = new Thread(runnable, "taskito-batcher"); + thread.setDaemon(true); + return thread; + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/batch/package-info.java b/sdks/java/src/main/java/org/byteveda/taskito/batch/package-info.java new file mode 100644 index 00000000..cd8c6deb --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/batch/package-info.java @@ -0,0 +1,7 @@ +/** + * Producer-side batching: {@link org.byteveda.taskito.batch.Batcher} buffers + * payloads and flushes them in one {@code enqueueMany} when the batch fills or a + * delay elapses. The worker side already batches via the worker's + * {@code batchSize} option (which drives the core batch dequeue). + */ +package org.byteveda.taskito.batch; From 1ca1343f82947ed0d1befd9735555b21b6d7f982 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 28 Jun 2026 19:25:34 +0530 Subject: [PATCH 2/5] test(java): cover the producer batcher Flush on full, on close, and after the delay. --- .../org/byteveda/taskito/BatcherTest.java | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java diff --git a/sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java b/sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java new file mode 100644 index 00000000..90fe01b7 --- /dev/null +++ b/sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java @@ -0,0 +1,59 @@ +package org.byteveda.taskito; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import org.byteveda.taskito.batch.Batcher; +import org.byteveda.taskito.task.Task; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +class BatcherTest { + + private static final Task TASK = Task.of("b.task", Integer.class); + + @Test + void flushesWhenBatchFull(@TempDir Path dir) { + try (Taskito queue = + Taskito.builder().url(dir.resolve("b.db").toString()).open(); + Batcher batcher = Batcher.of(queue, TASK, 3, Duration.ofSeconds(60))) { + assertTrue(batcher.add(1).isEmpty()); + assertTrue(batcher.add(2).isEmpty()); + List ids = batcher.add(3); + assertEquals(3, ids.size()); + assertEquals(3, queue.stats().pending); + } + } + + @Test + void flushesRemainderOnClose(@TempDir Path dir) { + try (Taskito queue = + Taskito.builder().url(dir.resolve("b.db").toString()).open()) { + try (Batcher batcher = Batcher.of(queue, TASK, 100, Duration.ofSeconds(60))) { + batcher.add(1); + batcher.add(2); + } + assertEquals(2, queue.stats().pending); + } + } + + @Test + @Timeout(30) + void flushesAfterDelay(@TempDir Path dir) throws Exception { + try (Taskito queue = + Taskito.builder().url(dir.resolve("b.db").toString()).open(); + Batcher batcher = Batcher.of(queue, TASK, 100, Duration.ofMillis(200))) { + batcher.add(1); + batcher.add(2); + long deadline = System.nanoTime() + Duration.ofSeconds(10).toNanos(); + while (queue.stats().pending < 2 && System.nanoTime() < deadline) { + Thread.sleep(50); + } + assertEquals(2, queue.stats().pending); + } + } +} From ca1952f2e0fc232a5c54e4c3f00c3b04838e8c56 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Wed, 1 Jul 2026 09:49:07 +0530 Subject: [PATCH 3/5] fix(java): batcher keeps buffer until enqueue succeeds --- .../src/main/java/org/byteveda/taskito/batch/Batcher.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java index 223ad29b..f6d0bdaf 100644 --- a/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java +++ b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java @@ -82,8 +82,11 @@ private List flushLocked() { return List.of(); } List batch = new ArrayList<>(buffer); + // Enqueue before clearing: if enqueueMany throws, the buffer keeps the + // payloads so a delayed-flush failure doesn't silently drop them. + List ids = queue.enqueueMany(task, batch); buffer.clear(); - return queue.enqueueMany(task, batch); + return ids; } private void scheduleFlush() { From 7558c7dcbee79789ad12eb54f82831e7b8b3822e Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Wed, 1 Jul 2026 09:49:53 +0530 Subject: [PATCH 4/5] fix(java): serialize batcher close with add --- .../java/org/byteveda/taskito/batch/Batcher.java | 14 +++++++++++++- .../java/org/byteveda/taskito/BatcherTest.java | 11 +++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java index f6d0bdaf..34793b57 100644 --- a/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java +++ b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java @@ -27,6 +27,7 @@ public final class Batcher implements AutoCloseable { private final Object lock = new Object(); private final List buffer = new ArrayList<>(); private ScheduledFuture pendingFlush; + private boolean closed; // guarded by lock public Batcher(Taskito queue, Task task, int maxBatch, Duration maxDelay) { if (maxBatch <= 0) { @@ -51,6 +52,9 @@ public static Batcher of(Taskito queue, Task task, int maxBatch, Durat */ public List add(T payload) { synchronized (lock) { + if (closed) { + throw new IllegalStateException("batcher is closed"); + } buffer.add(payload); if (buffer.size() >= maxBatch) { return flushLocked(); @@ -69,7 +73,15 @@ public List flush() { @Override public void close() { - flush(); + synchronized (lock) { + if (closed) { + return; + } + closed = true; + // Flush and mark closed atomically so no add() can slip in and + // schedule a delayed flush that shutdownNow() would then cancel. + flushLocked(); + } scheduler.shutdownNow(); } diff --git a/sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java b/sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java index 90fe01b7..4f77aa5a 100644 --- a/sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java +++ b/sdks/java/src/test/java/org/byteveda/taskito/BatcherTest.java @@ -1,6 +1,7 @@ package org.byteveda.taskito; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; @@ -41,6 +42,16 @@ void flushesRemainderOnClose(@TempDir Path dir) { } } + @Test + void addAfterCloseThrows(@TempDir Path dir) { + try (Taskito queue = + Taskito.builder().url(dir.resolve("b.db").toString()).open()) { + Batcher batcher = Batcher.of(queue, TASK, 100, Duration.ofSeconds(60)); + batcher.close(); + assertThrows(IllegalStateException.class, () -> batcher.add(1)); + } + } + @Test @Timeout(30) void flushesAfterDelay(@TempDir Path dir) throws Exception { From a2cfa96d76770594584a549591fb986fbf7d5d1a Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Wed, 1 Jul 2026 09:51:07 +0530 Subject: [PATCH 5/5] fix(java): preserve sub-millisecond batcher maxDelay --- .../src/main/java/org/byteveda/taskito/batch/Batcher.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java index 34793b57..ded2a49e 100644 --- a/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java +++ b/sdks/java/src/main/java/org/byteveda/taskito/batch/Batcher.java @@ -22,7 +22,7 @@ public final class Batcher implements AutoCloseable { private final Taskito queue; private final Task task; private final int maxBatch; - private final long maxDelayMs; + private final long maxDelayNanos; private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(Batcher::daemon); private final Object lock = new Object(); private final List buffer = new ArrayList<>(); @@ -39,7 +39,9 @@ public Batcher(Taskito queue, Task task, int maxBatch, Duration maxDelay) { this.queue = queue; this.task = task; this.maxBatch = maxBatch; - this.maxDelayMs = maxDelay.toMillis(); + // Nanoseconds, not millis: toMillis() would truncate a sub-millisecond + // delay to 0 and flush eagerly instead of honoring the requested delay. + this.maxDelayNanos = maxDelay.toNanos(); } public static Batcher of(Taskito queue, Task task, int maxBatch, Duration maxDelay) { @@ -103,7 +105,7 @@ private List flushLocked() { private void scheduleFlush() { if (pendingFlush == null) { - pendingFlush = scheduler.schedule(this::flush, maxDelayMs, TimeUnit.MILLISECONDS); + pendingFlush = scheduler.schedule(this::flush, maxDelayNanos, TimeUnit.NANOSECONDS); } }