Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions sdks/java/src/main/java/org/byteveda/taskito/DefaultTaskito.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import java.util.function.Consumer;
import java.util.function.Function;
import org.byteveda.taskito.core.CoreFacade;
import org.byteveda.taskito.errors.InterceptionException;
import org.byteveda.taskito.errors.PredicateRejectedException;
import org.byteveda.taskito.errors.SerializationException;
import org.byteveda.taskito.errors.WorkflowException;
import org.byteveda.taskito.interception.Interception;
import org.byteveda.taskito.interception.Interceptor;
import org.byteveda.taskito.locks.Lock;
import org.byteveda.taskito.locks.LockInfo;
import org.byteveda.taskito.middleware.EnqueueContext;
Expand Down Expand Up @@ -68,6 +71,7 @@ final class DefaultTaskito implements Taskito {
private final List<Middleware> middleware = new CopyOnWriteArrayList<>();
private final ResourceRuntime resources = new ResourceRuntime();
private final Map<String, List<Predicate>> predicates = new ConcurrentHashMap<>();
private final List<Interceptor> interceptors = new CopyOnWriteArrayList<>();

DefaultTaskito(QueueBackend backend, Serializer serializer) {
this.backend = backend;
Expand Down Expand Up @@ -119,6 +123,12 @@ public Taskito predicate(String taskName, Predicate predicate) {
return this;
}

@Override
public Taskito intercept(Interceptor interceptor) {
interceptors.add(interceptor);
return this;
}

@SuppressWarnings("unchecked")
private static <T> T cast(Object value) {
return (T) value;
Expand All @@ -141,13 +151,29 @@ public String enqueue(String taskName, Object payload) {
return dispatchEnqueue(taskName, payload, EnqueueOptions.none());
}

/** Run onEnqueue middleware, then serialize and submit the (possibly rewritten) job. */
/** Run interceptors + onEnqueue middleware, then serialize and submit the (possibly rewritten) job. */
private String dispatchEnqueue(String taskName, Object payload, EnqueueOptions options) {
for (Interceptor interceptor : interceptors) {
Interception outcome = interceptor.intercept(taskName, payload);
if (outcome == null) {
throw new InterceptionException("interceptor returned null for task '" + taskName + "'");
}
if (outcome instanceof Interception.Reject reject) {
throw new InterceptionException("enqueue of '" + taskName + "' rejected: " + reject.reason());
} else if (outcome instanceof Interception.Redirect redirect) {
taskName = redirect.taskName();
payload = redirect.payload();
} else if (outcome instanceof Interception.Convert convert) {
payload = convert.payload();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
// Pass: leave taskName/payload unchanged.
}
EnqueueContext context = new EnqueueContext(taskName, payload, options);
for (Middleware m : middleware) {
m.onEnqueue(context);
}
// Gate the payload that will actually be enqueued (after middleware may have rewritten it).
// Gate the payload that will actually be enqueued (after interceptors and
// middleware may have rewritten it).
Comment thread
coderabbitai[bot] marked this conversation as resolved.
gate(taskName, context.payload());
EnqueueOptions finalOptions = context.options();
if (!context.metadata().isEmpty()) {
Expand Down Expand Up @@ -179,19 +205,44 @@ public <T> List<String> enqueueMany(Task<T> task, List<T> payloads) {

@Override
public <T> List<String> enqueueMany(Task<T> task, List<T> payloads, EnqueueOptions options) {
// Preflight every payload through the task's gates; a single rejection fails the whole batch.
for (T payload : payloads) {
gate(task.name(), payload);
}
// Run each payload through interceptors then gates (a single rejection fails
// the whole batch), so batch enqueue can't bypass the interception contract.
// Nothing is submitted until the single enqueueMany call, so it stays all-or-nothing.
byte[][] bytes = new byte[payloads.size()][];
List<EnqueueOptions> perJob = new ArrayList<>(payloads.size());
for (int i = 0; i < payloads.size(); i++) {
bytes[i] = serializer.serialize(payloads.get(i));
Object payload = interceptBatchPayload(task.name(), payloads.get(i));
gate(task.name(), payload);
bytes[i] = serializer.serialize(payload);
perJob.add(options);
}
return Arrays.asList(backend.enqueueMany(task.name(), bytes, encode(perJob)));
}

/**
* Apply interceptors to one batched payload: {@code Convert} rewrites it,
* {@code Reject} fails the batch. {@code Redirect} is unsupported here — it would
* move an item to a different task, breaking the single-task batch — so it throws.
*/
private Object interceptBatchPayload(String taskName, Object payload) {
for (Interceptor interceptor : interceptors) {
Interception outcome = interceptor.intercept(taskName, payload);
if (outcome == null) {
throw new InterceptionException("interceptor returned null for task '" + taskName + "'");
}
if (outcome instanceof Interception.Reject reject) {
throw new InterceptionException("enqueue of '" + taskName + "' rejected: " + reject.reason());
} else if (outcome instanceof Interception.Convert convert) {
payload = convert.payload();
} else if (outcome instanceof Interception.Redirect) {
throw new InterceptionException(
"interceptor Redirect is not supported for batch enqueue of task '" + taskName + "'");
}
// Pass: leave payload unchanged.
}
return payload;
}

@Override
public <T> List<String> enqueueAll(Task<T> task, List<T> payloads) {
return enqueueMany(task, payloads);
Expand Down
7 changes: 7 additions & 0 deletions sdks/java/src/main/java/org/byteveda/taskito/Taskito.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import org.byteveda.taskito.errors.ConfigurationException;
import org.byteveda.taskito.interception.Interceptor;
import org.byteveda.taskito.internal.JniQueueBackend;
import org.byteveda.taskito.locks.Lock;
import org.byteveda.taskito.locks.LockInfo;
Expand Down Expand Up @@ -82,6 +83,12 @@ static Builder builder() {
*/
Taskito predicate(String taskName, Predicate predicate);

/**
* Register an interceptor that may convert, redirect, or reject each enqueue
* before it is serialized (see {@link Interceptor}). Returns {@code this}.
*/
Taskito intercept(Interceptor interceptor);

// ── Producer ────────────────────────────────────────────────────

/** Enqueue a typed payload using the task's default options; returns the job id. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.byteveda.taskito.errors;

import org.byteveda.taskito.TaskitoException;

/** An interceptor rejected an enqueue, so no job was created. */
public class InterceptionException extends TaskitoException {
public InterceptionException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.byteveda.taskito.interception;

/**
* The outcome of an {@link Interceptor}: one of the interception strategies.
*
* <ul>
* <li>{@link Pass} — enqueue the payload unchanged.
* <li>{@link Convert} — replace the payload (e.g. with a proxy reference).
* <li>{@link Redirect} — enqueue a different task (and payload) instead.
* <li>{@link Reject} — block the enqueue with a reason.
* </ul>
*/
public sealed interface Interception
permits Interception.Pass, Interception.Convert, Interception.Redirect, Interception.Reject {

/** Enqueue the original payload unchanged. */
record Pass() implements Interception {}

/** Enqueue {@code payload} in place of the original. */
record Convert(Object payload) implements Interception {}

/** Enqueue {@code taskName} with {@code payload} instead of the original task. */
record Redirect(String taskName, Object payload) implements Interception {}

/** Block the enqueue; {@code reason} is surfaced on the thrown exception. */
record Reject(String reason) implements Interception {}

static Interception pass() {
return new Pass();
}

static Interception convert(Object payload) {
return new Convert(payload);
}

static Interception redirect(String taskName, Object payload) {
return new Redirect(taskName, payload);
}

static Interception reject(String reason) {
return new Reject(reason);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.byteveda.taskito.interception;

/**
* Inspects an enqueue on the producer and decides what to do with it (see
* {@link Interception}). Runs synchronously before serialization; keep it fast.
*/
@FunctionalInterface
public interface Interceptor {
Interception intercept(String taskName, Object payload);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* Producer-side argument interception: an {@link org.byteveda.taskito.interception.Interceptor}
* inspects each enqueue and returns an {@link org.byteveda.taskito.interception.Interception}
* — pass it through, convert the payload (e.g. to a
* {@link org.byteveda.taskito.proxies.ProxyRef}), redirect it to another task,
* or reject it. Register with {@code Taskito.intercept(...)}. Unlike Python's
* implicit arg-walking, interception here is an explicit transform over the
* typed payload.
*/
package org.byteveda.taskito.interception;
129 changes: 129 additions & 0 deletions sdks/java/src/test/java/org/byteveda/taskito/InterceptionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package org.byteveda.taskito;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.byteveda.taskito.errors.InterceptionException;
import org.byteveda.taskito.interception.Interception;
import org.byteveda.taskito.task.Task;
import org.byteveda.taskito.worker.Worker;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

class InterceptionTest {

private static final Task<Integer> A = Task.of("ic.a", Integer.class);
private static final Task<Integer> B = Task.of("ic.b", Integer.class);

@Test
@Timeout(30)
void convertsPayload(@TempDir Path dir) throws Exception {
try (Taskito queue =
Taskito.builder().url(dir.resolve("ic.db").toString()).open()) {
queue.intercept((task, payload) -> Interception.convert((Integer) payload * 10));
AtomicInteger seen = new AtomicInteger();
CountDownLatch ran = new CountDownLatch(1);
try (Worker worker = queue.worker()
.handle(A, p -> {
seen.set(p);
ran.countDown();
return p;
})
.start()) {
queue.enqueue(A, 5);
assertTrue(ran.await(20, TimeUnit.SECONDS));
assertEquals(50, seen.get());
}
}
}

@Test
@Timeout(30)
void redirectsTask(@TempDir Path dir) throws Exception {
try (Taskito queue =
Taskito.builder().url(dir.resolve("ic.db").toString()).open()) {
queue.intercept((task, payload) ->
task.equals("ic.a") ? Interception.redirect("ic.b", payload) : Interception.pass());
AtomicInteger aRan = new AtomicInteger();
CountDownLatch bRan = new CountDownLatch(1);
try (Worker worker = queue.worker()
.handle(A, p -> aRan.incrementAndGet())
.handle(B, p -> {
bRan.countDown();
return p;
})
.start()) {
queue.enqueue(A, 1);
assertTrue(bRan.await(20, TimeUnit.SECONDS));
assertEquals(0, aRan.get());
}
}
}

@Test
void rejectsEnqueue(@TempDir Path dir) {
try (Taskito queue =
Taskito.builder().url(dir.resolve("ic.db").toString()).open()) {
queue.intercept((task, payload) -> Interception.reject("blocked"));
assertThrows(InterceptionException.class, () -> queue.enqueue(A, 1));
}
}

@Test
void rejectsNullInterceptorResult(@TempDir Path dir) {
try (Taskito queue =
Taskito.builder().url(dir.resolve("ic.db").toString()).open()) {
queue.intercept((task, payload) -> null);
assertThrows(InterceptionException.class, () -> queue.enqueue(A, 1));
}
}

@Test
@Timeout(30)
void batchEnqueueConvertsPayloads(@TempDir Path dir) throws Exception {
try (Taskito queue =
Taskito.builder().url(dir.resolve("ic.db").toString()).open()) {
queue.intercept((task, payload) -> Interception.convert((Integer) payload * 10));
AtomicInteger sum = new AtomicInteger();
CountDownLatch ran = new CountDownLatch(3);
try (Worker worker = queue.worker()
.handle(A, p -> {
sum.addAndGet(p);
ran.countDown();
return p;
})
.start()) {
queue.enqueueMany(A, List.of(1, 2, 3));
assertTrue(ran.await(20, TimeUnit.SECONDS));
assertEquals(60, sum.get()); // (1+2+3)*10 — interception ran on the batch
}
}
}

@Test
void batchEnqueueRejectsViaInterceptor(@TempDir Path dir) {
try (Taskito queue =
Taskito.builder().url(dir.resolve("ic.db").toString()).open()) {
queue.intercept(
(task, payload) -> (Integer) payload < 0 ? Interception.reject("negative") : Interception.pass());
assertThrows(InterceptionException.class, () -> queue.enqueueMany(A, List.of(1, -1, 2)));
}
}

@Test
void batchEnqueueRejectsRedirect(@TempDir Path dir) {
try (Taskito queue =
Taskito.builder().url(dir.resolve("ic.db").toString()).open()) {
queue.intercept((task, payload) -> Interception.redirect("ic.b", payload));
// Redirect can't move an item out of a single-task batch — fail fast.
assertThrows(InterceptionException.class, () -> queue.enqueueMany(A, List.of(1, 2)));
}
}
}