diff --git a/sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.java b/sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.java index bd730483..feb600a9 100644 --- a/sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.java +++ b/sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.java @@ -17,9 +17,12 @@ import java.util.function.Consumer; import java.util.function.Function; import org.byteveda.taskito.core.CoreFacade; +import org.byteveda.taskito.errors.InterceptionException; import org.byteveda.taskito.errors.PredicateRejectedException; import org.byteveda.taskito.errors.SerializationException; import org.byteveda.taskito.errors.WorkflowException; +import org.byteveda.taskito.interception.Interception; +import org.byteveda.taskito.interception.Interceptor; import org.byteveda.taskito.locks.Lock; import org.byteveda.taskito.locks.LockInfo; import org.byteveda.taskito.middleware.EnqueueContext; @@ -68,6 +71,7 @@ final class DefaultTaskito implements Taskito { private final List middleware = new CopyOnWriteArrayList<>(); private final ResourceRuntime resources = new ResourceRuntime(); private final Map> predicates = new ConcurrentHashMap<>(); + private final List interceptors = new CopyOnWriteArrayList<>(); DefaultTaskito(QueueBackend backend, Serializer serializer) { this.backend = backend; @@ -119,6 +123,12 @@ public Taskito predicate(String taskName, Predicate predicate) { return this; } + @Override + public Taskito intercept(Interceptor interceptor) { + interceptors.add(interceptor); + return this; + } + @SuppressWarnings("unchecked") private static T cast(Object value) { return (T) value; @@ -141,13 +151,29 @@ public String enqueue(String taskName, Object payload) { return dispatchEnqueue(taskName, payload, EnqueueOptions.none()); } - /** Run onEnqueue middleware, then serialize and submit the (possibly rewritten) job. */ + /** Run interceptors + onEnqueue middleware, then serialize and submit the (possibly rewritten) job. */ private String dispatchEnqueue(String taskName, Object payload, EnqueueOptions options) { + for (Interceptor interceptor : interceptors) { + Interception outcome = interceptor.intercept(taskName, payload); + if (outcome == null) { + throw new InterceptionException("interceptor returned null for task '" + taskName + "'"); + } + if (outcome instanceof Interception.Reject reject) { + throw new InterceptionException("enqueue of '" + taskName + "' rejected: " + reject.reason()); + } else if (outcome instanceof Interception.Redirect redirect) { + taskName = redirect.taskName(); + payload = redirect.payload(); + } else if (outcome instanceof Interception.Convert convert) { + payload = convert.payload(); + } + // Pass: leave taskName/payload unchanged. + } EnqueueContext context = new EnqueueContext(taskName, payload, options); for (Middleware m : middleware) { m.onEnqueue(context); } - // Gate the payload that will actually be enqueued (after middleware may have rewritten it). + // Gate the payload that will actually be enqueued (after interceptors and + // middleware may have rewritten it). gate(taskName, context.payload()); EnqueueOptions finalOptions = context.options(); if (!context.metadata().isEmpty()) { @@ -179,19 +205,44 @@ public List enqueueMany(Task task, List payloads) { @Override public List enqueueMany(Task task, List payloads, EnqueueOptions options) { - // Preflight every payload through the task's gates; a single rejection fails the whole batch. - for (T payload : payloads) { - gate(task.name(), payload); - } + // Run each payload through interceptors then gates (a single rejection fails + // the whole batch), so batch enqueue can't bypass the interception contract. + // Nothing is submitted until the single enqueueMany call, so it stays all-or-nothing. byte[][] bytes = new byte[payloads.size()][]; List perJob = new ArrayList<>(payloads.size()); for (int i = 0; i < payloads.size(); i++) { - bytes[i] = serializer.serialize(payloads.get(i)); + Object payload = interceptBatchPayload(task.name(), payloads.get(i)); + gate(task.name(), payload); + bytes[i] = serializer.serialize(payload); perJob.add(options); } return Arrays.asList(backend.enqueueMany(task.name(), bytes, encode(perJob))); } + /** + * Apply interceptors to one batched payload: {@code Convert} rewrites it, + * {@code Reject} fails the batch. {@code Redirect} is unsupported here — it would + * move an item to a different task, breaking the single-task batch — so it throws. + */ + private Object interceptBatchPayload(String taskName, Object payload) { + for (Interceptor interceptor : interceptors) { + Interception outcome = interceptor.intercept(taskName, payload); + if (outcome == null) { + throw new InterceptionException("interceptor returned null for task '" + taskName + "'"); + } + if (outcome instanceof Interception.Reject reject) { + throw new InterceptionException("enqueue of '" + taskName + "' rejected: " + reject.reason()); + } else if (outcome instanceof Interception.Convert convert) { + payload = convert.payload(); + } else if (outcome instanceof Interception.Redirect) { + throw new InterceptionException( + "interceptor Redirect is not supported for batch enqueue of task '" + taskName + "'"); + } + // Pass: leave payload unchanged. + } + return payload; + } + @Override public List enqueueAll(Task task, List payloads) { return enqueueMany(task, payloads); diff --git a/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java b/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java index ab59e5a3..2c34190c 100644 --- a/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java +++ b/sdks/java/src/main/java/org/byteveda/taskito/Taskito.java @@ -14,6 +14,7 @@ import java.util.function.Consumer; import java.util.function.Function; import org.byteveda.taskito.errors.ConfigurationException; +import org.byteveda.taskito.interception.Interceptor; import org.byteveda.taskito.internal.JniQueueBackend; import org.byteveda.taskito.locks.Lock; import org.byteveda.taskito.locks.LockInfo; @@ -82,6 +83,12 @@ static Builder builder() { */ Taskito predicate(String taskName, Predicate predicate); + /** + * Register an interceptor that may convert, redirect, or reject each enqueue + * before it is serialized (see {@link Interceptor}). Returns {@code this}. + */ + Taskito intercept(Interceptor interceptor); + // ── Producer ──────────────────────────────────────────────────── /** Enqueue a typed payload using the task's default options; returns the job id. */ diff --git a/sdks/java/src/main/java/org/byteveda/taskito/errors/InterceptionException.java b/sdks/java/src/main/java/org/byteveda/taskito/errors/InterceptionException.java new file mode 100644 index 00000000..c6699458 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/errors/InterceptionException.java @@ -0,0 +1,10 @@ +package org.byteveda.taskito.errors; + +import org.byteveda.taskito.TaskitoException; + +/** An interceptor rejected an enqueue, so no job was created. */ +public class InterceptionException extends TaskitoException { + public InterceptionException(String message) { + super(message); + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/interception/Interception.java b/sdks/java/src/main/java/org/byteveda/taskito/interception/Interception.java new file mode 100644 index 00000000..615299d4 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/interception/Interception.java @@ -0,0 +1,43 @@ +package org.byteveda.taskito.interception; + +/** + * The outcome of an {@link Interceptor}: one of the interception strategies. + * + *
    + *
  • {@link Pass} — enqueue the payload unchanged. + *
  • {@link Convert} — replace the payload (e.g. with a proxy reference). + *
  • {@link Redirect} — enqueue a different task (and payload) instead. + *
  • {@link Reject} — block the enqueue with a reason. + *
+ */ +public sealed interface Interception + permits Interception.Pass, Interception.Convert, Interception.Redirect, Interception.Reject { + + /** Enqueue the original payload unchanged. */ + record Pass() implements Interception {} + + /** Enqueue {@code payload} in place of the original. */ + record Convert(Object payload) implements Interception {} + + /** Enqueue {@code taskName} with {@code payload} instead of the original task. */ + record Redirect(String taskName, Object payload) implements Interception {} + + /** Block the enqueue; {@code reason} is surfaced on the thrown exception. */ + record Reject(String reason) implements Interception {} + + static Interception pass() { + return new Pass(); + } + + static Interception convert(Object payload) { + return new Convert(payload); + } + + static Interception redirect(String taskName, Object payload) { + return new Redirect(taskName, payload); + } + + static Interception reject(String reason) { + return new Reject(reason); + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/interception/Interceptor.java b/sdks/java/src/main/java/org/byteveda/taskito/interception/Interceptor.java new file mode 100644 index 00000000..8bd94121 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/interception/Interceptor.java @@ -0,0 +1,10 @@ +package org.byteveda.taskito.interception; + +/** + * Inspects an enqueue on the producer and decides what to do with it (see + * {@link Interception}). Runs synchronously before serialization; keep it fast. + */ +@FunctionalInterface +public interface Interceptor { + Interception intercept(String taskName, Object payload); +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/interception/package-info.java b/sdks/java/src/main/java/org/byteveda/taskito/interception/package-info.java new file mode 100644 index 00000000..5eb6c500 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/interception/package-info.java @@ -0,0 +1,10 @@ +/** + * Producer-side argument interception: an {@link org.byteveda.taskito.interception.Interceptor} + * inspects each enqueue and returns an {@link org.byteveda.taskito.interception.Interception} + * — pass it through, convert the payload (e.g. to a + * {@link org.byteveda.taskito.proxies.ProxyRef}), redirect it to another task, + * or reject it. Register with {@code Taskito.intercept(...)}. Unlike Python's + * implicit arg-walking, interception here is an explicit transform over the + * typed payload. + */ +package org.byteveda.taskito.interception; diff --git a/sdks/java/src/test/java/org/byteveda/taskito/InterceptionTest.java b/sdks/java/src/test/java/org/byteveda/taskito/InterceptionTest.java new file mode 100644 index 00000000..b78999a0 --- /dev/null +++ b/sdks/java/src/test/java/org/byteveda/taskito/InterceptionTest.java @@ -0,0 +1,129 @@ +package org.byteveda.taskito; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.byteveda.taskito.errors.InterceptionException; +import org.byteveda.taskito.interception.Interception; +import org.byteveda.taskito.task.Task; +import org.byteveda.taskito.worker.Worker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +class InterceptionTest { + + private static final Task A = Task.of("ic.a", Integer.class); + private static final Task B = Task.of("ic.b", Integer.class); + + @Test + @Timeout(30) + void convertsPayload(@TempDir Path dir) throws Exception { + try (Taskito queue = + Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { + queue.intercept((task, payload) -> Interception.convert((Integer) payload * 10)); + AtomicInteger seen = new AtomicInteger(); + CountDownLatch ran = new CountDownLatch(1); + try (Worker worker = queue.worker() + .handle(A, p -> { + seen.set(p); + ran.countDown(); + return p; + }) + .start()) { + queue.enqueue(A, 5); + assertTrue(ran.await(20, TimeUnit.SECONDS)); + assertEquals(50, seen.get()); + } + } + } + + @Test + @Timeout(30) + void redirectsTask(@TempDir Path dir) throws Exception { + try (Taskito queue = + Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { + queue.intercept((task, payload) -> + task.equals("ic.a") ? Interception.redirect("ic.b", payload) : Interception.pass()); + AtomicInteger aRan = new AtomicInteger(); + CountDownLatch bRan = new CountDownLatch(1); + try (Worker worker = queue.worker() + .handle(A, p -> aRan.incrementAndGet()) + .handle(B, p -> { + bRan.countDown(); + return p; + }) + .start()) { + queue.enqueue(A, 1); + assertTrue(bRan.await(20, TimeUnit.SECONDS)); + assertEquals(0, aRan.get()); + } + } + } + + @Test + void rejectsEnqueue(@TempDir Path dir) { + try (Taskito queue = + Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { + queue.intercept((task, payload) -> Interception.reject("blocked")); + assertThrows(InterceptionException.class, () -> queue.enqueue(A, 1)); + } + } + + @Test + void rejectsNullInterceptorResult(@TempDir Path dir) { + try (Taskito queue = + Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { + queue.intercept((task, payload) -> null); + assertThrows(InterceptionException.class, () -> queue.enqueue(A, 1)); + } + } + + @Test + @Timeout(30) + void batchEnqueueConvertsPayloads(@TempDir Path dir) throws Exception { + try (Taskito queue = + Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { + queue.intercept((task, payload) -> Interception.convert((Integer) payload * 10)); + AtomicInteger sum = new AtomicInteger(); + CountDownLatch ran = new CountDownLatch(3); + try (Worker worker = queue.worker() + .handle(A, p -> { + sum.addAndGet(p); + ran.countDown(); + return p; + }) + .start()) { + queue.enqueueMany(A, List.of(1, 2, 3)); + assertTrue(ran.await(20, TimeUnit.SECONDS)); + assertEquals(60, sum.get()); // (1+2+3)*10 — interception ran on the batch + } + } + } + + @Test + void batchEnqueueRejectsViaInterceptor(@TempDir Path dir) { + try (Taskito queue = + Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { + queue.intercept( + (task, payload) -> (Integer) payload < 0 ? Interception.reject("negative") : Interception.pass()); + assertThrows(InterceptionException.class, () -> queue.enqueueMany(A, List.of(1, -1, 2))); + } + } + + @Test + void batchEnqueueRejectsRedirect(@TempDir Path dir) { + try (Taskito queue = + Taskito.builder().url(dir.resolve("ic.db").toString()).open()) { + queue.intercept((task, payload) -> Interception.redirect("ic.b", payload)); + // Redirect can't move an item out of a single-task batch — fail fast. + assertThrows(InterceptionException.class, () -> queue.enqueueMany(A, List.of(1, 2))); + } + } +}