From b43105781aff9ef2ca1b883f84993add57d034d4 Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 28 Jun 2026 19:42:07 +0530 Subject: [PATCH 1/2] feat(java): observability contrib middleware TaskitoObservation wraps each task in a Micrometer Observation (metrics + trace span, OTel-pluggable); SentryMiddleware reports failures and dead-letters. Both deps are compileOnly so consumers opt in. --- sdks/java/build.gradle.kts | 8 ++ .../taskito/contrib/SentryMiddleware.java | 52 +++++++++++++ .../taskito/contrib/TaskitoObservation.java | 73 +++++++++++++++++++ .../taskito/contrib/package-info.java | 14 ++++ 4 files changed, 147 insertions(+) create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/contrib/SentryMiddleware.java create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/contrib/TaskitoObservation.java create mode 100644 sdks/java/src/main/java/org/byteveda/taskito/contrib/package-info.java diff --git a/sdks/java/build.gradle.kts b/sdks/java/build.gradle.kts index 307193ed..9428d60a 100644 --- a/sdks/java/build.gradle.kts +++ b/sdks/java/build.gradle.kts @@ -91,6 +91,14 @@ dependencies { compileOnly("org.msgpack:jackson-dataformat-msgpack:0.9.8") testImplementation("org.msgpack:jackson-dataformat-msgpack:0.9.8") + // Optional: observability contrib middleware. Consumers that use them add the + // matching runtime dependency themselves. + compileOnly("io.micrometer:micrometer-observation:1.13.6") + testImplementation("io.micrometer:micrometer-observation:1.13.6") + testImplementation("io.micrometer:micrometer-observation-test:1.13.6") + compileOnly("io.sentry:sentry:7.14.0") + testImplementation("io.sentry:sentry:7.14.0") + // Run the @TaskHandler processor over the tests so the generated companions // are exercised end-to-end. Consumers wire it the same way. testAnnotationProcessor(project(":processor")) diff --git a/sdks/java/src/main/java/org/byteveda/taskito/contrib/SentryMiddleware.java b/sdks/java/src/main/java/org/byteveda/taskito/contrib/SentryMiddleware.java new file mode 100644 index 00000000..0447223b --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/contrib/SentryMiddleware.java @@ -0,0 +1,52 @@ +package org.byteveda.taskito.contrib; + +import io.sentry.Sentry; +import io.sentry.SentryLevel; +import java.util.function.Predicate; +import org.byteveda.taskito.events.OutcomeEvent; +import org.byteveda.taskito.middleware.Middleware; +import org.byteveda.taskito.middleware.TaskContext; + +/** + * Reports task failures to Sentry: {@code onError} captures the exception, and + * {@code onDeadLetter} records a fatal event for a job that exhausted its + * retries. Each event is tagged with the task and job id. The application must + * initialise Sentry ({@code Sentry.init(...)}); when it has not, the calls are + * safe no-ops. + */ +public final class SentryMiddleware implements Middleware { + private final Predicate taskFilter; + + public SentryMiddleware() { + this(task -> true); + } + + public SentryMiddleware(Predicate taskFilter) { + this.taskFilter = taskFilter; + } + + @Override + public void onError(TaskContext context, Throwable error) { + if (!taskFilter.test(context.taskName)) { + return; + } + Sentry.withScope(scope -> { + scope.setTag("taskito.task", context.taskName); + scope.setTag("taskito.job", context.jobId); + Sentry.captureException(error); + }); + } + + @Override + public void onDeadLetter(OutcomeEvent event) { + if (!taskFilter.test(event.taskName)) { + return; + } + Sentry.withScope(scope -> { + scope.setLevel(SentryLevel.FATAL); + scope.setTag("taskito.task", event.taskName); + scope.setTag("taskito.job", event.jobId); + Sentry.captureMessage("task dead-lettered: " + (event.error == null ? "" : event.error)); + }); + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/contrib/TaskitoObservation.java b/sdks/java/src/main/java/org/byteveda/taskito/contrib/TaskitoObservation.java new file mode 100644 index 00000000..b0643a2b --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/contrib/TaskitoObservation.java @@ -0,0 +1,73 @@ +package org.byteveda.taskito.contrib; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; +import java.util.function.Predicate; +import org.byteveda.taskito.middleware.Middleware; +import org.byteveda.taskito.middleware.TaskContext; + +/** + * Wraps each task execution in a Micrometer {@link Observation}: one + * instrumentation that yields both metrics (a timer) and a trace span. Plug in + * OpenTelemetry (or any backend) by configuring the {@link ObservationRegistry} + * the application passes in. + * + *

The observation is created in {@code before}, made current for the handler + * (an open {@link Observation.Scope}), and stopped in {@code after}/{@code onError}; + * state is carried on the per-task {@link TaskContext#attributes()}. + */ +public final class TaskitoObservation implements Middleware { + private static final String OBSERVATION = "taskito.contrib.observation"; + private static final String SCOPE = "taskito.contrib.observation.scope"; + + private final ObservationRegistry registry; + private final String name; + private final Predicate taskFilter; + + public TaskitoObservation(ObservationRegistry registry) { + this(registry, "taskito.task", task -> true); + } + + public TaskitoObservation(ObservationRegistry registry, String name, Predicate taskFilter) { + this.registry = registry; + this.name = name; + this.taskFilter = taskFilter; + } + + @Override + public void before(TaskContext context) { + if (!taskFilter.test(context.taskName)) { + return; + } + Observation observation = Observation.createNotStarted(name, registry) + .lowCardinalityKeyValue("taskito.task", context.taskName) + .start(); + context.attributes().put(OBSERVATION, observation); + context.attributes().put(SCOPE, observation.openScope()); + } + + @Override + public void after(TaskContext context, Object result) { + stop(context, null); + } + + @Override + public void onError(TaskContext context, Throwable error) { + stop(context, error); + } + + private void stop(TaskContext context, Throwable error) { + Observation.Scope scope = (Observation.Scope) context.attributes().remove(SCOPE); + if (scope != null) { + scope.close(); + } + Observation observation = (Observation) context.attributes().remove(OBSERVATION); + if (observation == null) { + return; + } + if (error != null) { + observation.error(error); + } + observation.stop(); + } +} diff --git a/sdks/java/src/main/java/org/byteveda/taskito/contrib/package-info.java b/sdks/java/src/main/java/org/byteveda/taskito/contrib/package-info.java new file mode 100644 index 00000000..da4fd4c9 --- /dev/null +++ b/sdks/java/src/main/java/org/byteveda/taskito/contrib/package-info.java @@ -0,0 +1,14 @@ +/** + * Optional observability middleware. Each integration's third-party dependency + * is {@code compileOnly}, so a consumer that uses one adds the matching runtime + * dependency to their build. + * + *

+ */ +package org.byteveda.taskito.contrib; From faddb90a8185fd4187ddb48ad2f2b517d8278e7c Mon Sep 17 00:00:00 2001 From: Pratyush Sharma <56130065+pratyush618@users.noreply.github.com> Date: Sun, 28 Jun 2026 19:42:07 +0530 Subject: [PATCH 2/2] test(java): cover observability middleware Observation success/error/filter via TestObservationRegistry; Sentry no-op safety + filter. --- .../contrib/ObservationMiddlewareTest.java | 55 +++++++++++++++++++ .../taskito/contrib/SentryMiddlewareTest.java | 25 +++++++++ 2 files changed, 80 insertions(+) create mode 100644 sdks/java/src/test/java/org/byteveda/taskito/contrib/ObservationMiddlewareTest.java create mode 100644 sdks/java/src/test/java/org/byteveda/taskito/contrib/SentryMiddlewareTest.java diff --git a/sdks/java/src/test/java/org/byteveda/taskito/contrib/ObservationMiddlewareTest.java b/sdks/java/src/test/java/org/byteveda/taskito/contrib/ObservationMiddlewareTest.java new file mode 100644 index 00000000..cc36253f --- /dev/null +++ b/sdks/java/src/test/java/org/byteveda/taskito/contrib/ObservationMiddlewareTest.java @@ -0,0 +1,55 @@ +package org.byteveda.taskito.contrib; + +import static io.micrometer.observation.tck.TestObservationRegistryAssert.assertThat; + +import io.micrometer.observation.tck.TestObservationRegistry; +import org.byteveda.taskito.middleware.TaskContext; +import org.junit.jupiter.api.Test; + +class ObservationMiddlewareTest { + + @Test + void recordsObservationOnSuccess() { + TestObservationRegistry registry = TestObservationRegistry.create(); + TaskitoObservation middleware = new TaskitoObservation(registry); + TaskContext context = new TaskContext("job-1", "my.task"); + + middleware.before(context); + middleware.after(context, "ok"); + + assertThat(registry) + .hasObservationWithNameEqualTo("taskito.task") + .that() + .hasBeenStarted() + .hasBeenStopped() + .hasLowCardinalityKeyValue("taskito.task", "my.task"); + } + + @Test + void recordsErrorOnFailure() { + TestObservationRegistry registry = TestObservationRegistry.create(); + TaskitoObservation middleware = new TaskitoObservation(registry); + TaskContext context = new TaskContext("job-2", "my.task"); + + middleware.before(context); + middleware.onError(context, new IllegalStateException("boom")); + + assertThat(registry) + .hasObservationWithNameEqualTo("taskito.task") + .that() + .hasError(); + } + + @Test + void filterSkipsUnobservedTasks() { + TestObservationRegistry registry = TestObservationRegistry.create(); + TaskitoObservation middleware = + new TaskitoObservation(registry, "taskito.task", task -> task.equals("included")); + TaskContext context = new TaskContext("job-3", "excluded"); + + middleware.before(context); + middleware.after(context, "ok"); + + assertThat(registry).doesNotHaveAnyObservation(); + } +} diff --git a/sdks/java/src/test/java/org/byteveda/taskito/contrib/SentryMiddlewareTest.java b/sdks/java/src/test/java/org/byteveda/taskito/contrib/SentryMiddlewareTest.java new file mode 100644 index 00000000..af319ac4 --- /dev/null +++ b/sdks/java/src/test/java/org/byteveda/taskito/contrib/SentryMiddlewareTest.java @@ -0,0 +1,25 @@ +package org.byteveda.taskito.contrib; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +import org.byteveda.taskito.events.EventName; +import org.byteveda.taskito.events.OutcomeEvent; +import org.byteveda.taskito.middleware.TaskContext; +import org.junit.jupiter.api.Test; + +class SentryMiddlewareTest { + + @Test + void safeNoOpWhenSentryNotInitialized() { + SentryMiddleware middleware = new SentryMiddleware(); + // With no Sentry.init(...), the hooks must not break the worker. + assertDoesNotThrow(() -> middleware.onError(new TaskContext("j", "t"), new RuntimeException("x"))); + assertDoesNotThrow(() -> middleware.onDeadLetter(new OutcomeEvent(EventName.DEAD, "j", "t", "err", 0, false))); + } + + @Test + void filterSkipsUnreportedTasks() { + SentryMiddleware middleware = new SentryMiddleware(task -> task.equals("included")); + assertDoesNotThrow(() -> middleware.onError(new TaskContext("j", "excluded"), new RuntimeException("x"))); + } +}