-
Notifications
You must be signed in to change notification settings - Fork 0
feat(java): add enqueue interception #350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
10 changes: 10 additions & 0 deletions
10
sdks/java/src/main/java/org/byteveda/taskito/errors/InterceptionException.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package org.byteveda.taskito.errors; | ||
|
|
||
| import org.byteveda.taskito.TaskitoException; | ||
|
|
||
| /** An interceptor rejected an enqueue, so no job was created. */ | ||
| public class InterceptionException extends TaskitoException { | ||
| public InterceptionException(String message) { | ||
| super(message); | ||
| } | ||
| } |
43 changes: 43 additions & 0 deletions
43
sdks/java/src/main/java/org/byteveda/taskito/interception/Interception.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| package org.byteveda.taskito.interception; | ||
|
|
||
| /** | ||
| * The outcome of an {@link Interceptor}: one of the interception strategies. | ||
| * | ||
| * <ul> | ||
| * <li>{@link Pass} — enqueue the payload unchanged. | ||
| * <li>{@link Convert} — replace the payload (e.g. with a proxy reference). | ||
| * <li>{@link Redirect} — enqueue a different task (and payload) instead. | ||
| * <li>{@link Reject} — block the enqueue with a reason. | ||
| * </ul> | ||
| */ | ||
| public sealed interface Interception | ||
| permits Interception.Pass, Interception.Convert, Interception.Redirect, Interception.Reject { | ||
|
|
||
| /** Enqueue the original payload unchanged. */ | ||
| record Pass() implements Interception {} | ||
|
|
||
| /** Enqueue {@code payload} in place of the original. */ | ||
| record Convert(Object payload) implements Interception {} | ||
|
|
||
| /** Enqueue {@code taskName} with {@code payload} instead of the original task. */ | ||
| record Redirect(String taskName, Object payload) implements Interception {} | ||
|
|
||
| /** Block the enqueue; {@code reason} is surfaced on the thrown exception. */ | ||
| record Reject(String reason) implements Interception {} | ||
|
|
||
| static Interception pass() { | ||
| return new Pass(); | ||
| } | ||
|
|
||
| static Interception convert(Object payload) { | ||
| return new Convert(payload); | ||
| } | ||
|
|
||
| static Interception redirect(String taskName, Object payload) { | ||
| return new Redirect(taskName, payload); | ||
| } | ||
|
|
||
| static Interception reject(String reason) { | ||
| return new Reject(reason); | ||
| } | ||
| } |
10 changes: 10 additions & 0 deletions
10
sdks/java/src/main/java/org/byteveda/taskito/interception/Interceptor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| package org.byteveda.taskito.interception; | ||
|
|
||
| /** | ||
| * Inspects an enqueue on the producer and decides what to do with it (see | ||
| * {@link Interception}). Runs synchronously before serialization; keep it fast. | ||
| */ | ||
| @FunctionalInterface | ||
| public interface Interceptor { | ||
| Interception intercept(String taskName, Object payload); | ||
| } |
10 changes: 10 additions & 0 deletions
10
sdks/java/src/main/java/org/byteveda/taskito/interception/package-info.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| /** | ||
| * Producer-side argument interception: an {@link org.byteveda.taskito.interception.Interceptor} | ||
| * inspects each enqueue and returns an {@link org.byteveda.taskito.interception.Interception} | ||
| * — pass it through, convert the payload (e.g. to a | ||
| * {@link org.byteveda.taskito.proxies.ProxyRef}), redirect it to another task, | ||
| * or reject it. Register with {@code Taskito.intercept(...)}. Unlike Python's | ||
| * implicit arg-walking, interception here is an explicit transform over the | ||
| * typed payload. | ||
| */ | ||
| package org.byteveda.taskito.interception; |
129 changes: 129 additions & 0 deletions
129
sdks/java/src/test/java/org/byteveda/taskito/InterceptionTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| package org.byteveda.taskito; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertThrows; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
|
||
| import java.nio.file.Path; | ||
| import java.util.List; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import org.byteveda.taskito.errors.InterceptionException; | ||
| import org.byteveda.taskito.interception.Interception; | ||
| import org.byteveda.taskito.task.Task; | ||
| import org.byteveda.taskito.worker.Worker; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.Timeout; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
|
|
||
| class InterceptionTest { | ||
|
|
||
| private static final Task<Integer> A = Task.of("ic.a", Integer.class); | ||
| private static final Task<Integer> B = Task.of("ic.b", Integer.class); | ||
|
|
||
| @Test | ||
| @Timeout(30) | ||
| void convertsPayload(@TempDir Path dir) throws Exception { | ||
| try (Taskito queue = | ||
| Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { | ||
| queue.intercept((task, payload) -> Interception.convert((Integer) payload * 10)); | ||
| AtomicInteger seen = new AtomicInteger(); | ||
| CountDownLatch ran = new CountDownLatch(1); | ||
| try (Worker worker = queue.worker() | ||
| .handle(A, p -> { | ||
| seen.set(p); | ||
| ran.countDown(); | ||
| return p; | ||
| }) | ||
| .start()) { | ||
| queue.enqueue(A, 5); | ||
| assertTrue(ran.await(20, TimeUnit.SECONDS)); | ||
| assertEquals(50, seen.get()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| @Timeout(30) | ||
| void redirectsTask(@TempDir Path dir) throws Exception { | ||
| try (Taskito queue = | ||
| Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { | ||
| queue.intercept((task, payload) -> | ||
| task.equals("ic.a") ? Interception.redirect("ic.b", payload) : Interception.pass()); | ||
| AtomicInteger aRan = new AtomicInteger(); | ||
| CountDownLatch bRan = new CountDownLatch(1); | ||
| try (Worker worker = queue.worker() | ||
| .handle(A, p -> aRan.incrementAndGet()) | ||
| .handle(B, p -> { | ||
| bRan.countDown(); | ||
| return p; | ||
| }) | ||
| .start()) { | ||
| queue.enqueue(A, 1); | ||
| assertTrue(bRan.await(20, TimeUnit.SECONDS)); | ||
| assertEquals(0, aRan.get()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void rejectsEnqueue(@TempDir Path dir) { | ||
| try (Taskito queue = | ||
| Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { | ||
| queue.intercept((task, payload) -> Interception.reject("blocked")); | ||
| assertThrows(InterceptionException.class, () -> queue.enqueue(A, 1)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void rejectsNullInterceptorResult(@TempDir Path dir) { | ||
| try (Taskito queue = | ||
| Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { | ||
| queue.intercept((task, payload) -> null); | ||
| assertThrows(InterceptionException.class, () -> queue.enqueue(A, 1)); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| @Timeout(30) | ||
| void batchEnqueueConvertsPayloads(@TempDir Path dir) throws Exception { | ||
| try (Taskito queue = | ||
| Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { | ||
| queue.intercept((task, payload) -> Interception.convert((Integer) payload * 10)); | ||
| AtomicInteger sum = new AtomicInteger(); | ||
| CountDownLatch ran = new CountDownLatch(3); | ||
| try (Worker worker = queue.worker() | ||
| .handle(A, p -> { | ||
| sum.addAndGet(p); | ||
| ran.countDown(); | ||
| return p; | ||
| }) | ||
| .start()) { | ||
| queue.enqueueMany(A, List.of(1, 2, 3)); | ||
| assertTrue(ran.await(20, TimeUnit.SECONDS)); | ||
| assertEquals(60, sum.get()); // (1+2+3)*10 — interception ran on the batch | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void batchEnqueueRejectsViaInterceptor(@TempDir Path dir) { | ||
| try (Taskito queue = | ||
| Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { | ||
| queue.intercept( | ||
| (task, payload) -> (Integer) payload < 0 ? Interception.reject("negative") : Interception.pass()); | ||
| assertThrows(InterceptionException.class, () -> queue.enqueueMany(A, List.of(1, -1, 2))); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void batchEnqueueRejectsRedirect(@TempDir Path dir) { | ||
| try (Taskito queue = | ||
| Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { | ||
| queue.intercept((task, payload) -> Interception.redirect("ic.b", payload)); | ||
| // Redirect can't move an item out of a single-task batch — fail fast. | ||
| assertThrows(InterceptionException.class, () -> queue.enqueueMany(A, List.of(1, 2))); | ||
| } | ||
| } | ||
| } |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.