Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdks/java/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ dependencies {
compileOnly("org.msgpack:jackson-dataformat-msgpack:0.9.8")
testImplementation("org.msgpack:jackson-dataformat-msgpack:0.9.8")

// Optional: observability contrib middleware. Consumers that use them add the
// matching runtime dependency themselves.
compileOnly("io.micrometer:micrometer-observation:1.13.6")
testImplementation("io.micrometer:micrometer-observation:1.13.6")
testImplementation("io.micrometer:micrometer-observation-test:1.13.6")
compileOnly("io.sentry:sentry:7.14.0")
testImplementation("io.sentry:sentry:7.14.0")

// Run the @TaskHandler processor over the tests so the generated companions
// are exercised end-to-end. Consumers wire it the same way.
testAnnotationProcessor(project(":processor"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.byteveda.taskito.contrib;

import io.sentry.Sentry;
import io.sentry.SentryLevel;
import java.util.function.Predicate;
import org.byteveda.taskito.events.OutcomeEvent;
import org.byteveda.taskito.middleware.Middleware;
import org.byteveda.taskito.middleware.TaskContext;

/**
* Reports task failures to Sentry: {@code onError} captures the exception, and
* {@code onDeadLetter} records a fatal event for a job that exhausted its
* retries. Each event is tagged with the task and job id. The application must
* initialise Sentry ({@code Sentry.init(...)}); when it has not, the calls are
* safe no-ops.
*/
public final class SentryMiddleware implements Middleware {
private final Predicate<String> taskFilter;

public SentryMiddleware() {
this(task -> true);
}

public SentryMiddleware(Predicate<String> taskFilter) {
this.taskFilter = taskFilter;
}

@Override
public void onError(TaskContext context, Throwable error) {
if (!taskFilter.test(context.taskName)) {
return;
}
Sentry.withScope(scope -> {
scope.setTag("taskito.task", context.taskName);
scope.setTag("taskito.job", context.jobId);
Sentry.captureException(error);
});
}

@Override
public void onDeadLetter(OutcomeEvent event) {
if (!taskFilter.test(event.taskName)) {
return;
}
Sentry.withScope(scope -> {
scope.setLevel(SentryLevel.FATAL);
scope.setTag("taskito.task", event.taskName);
scope.setTag("taskito.job", event.jobId);
Sentry.captureMessage("task dead-lettered: " + (event.error == null ? "" : event.error));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.byteveda.taskito.contrib;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.function.Predicate;
import org.byteveda.taskito.middleware.Middleware;
import org.byteveda.taskito.middleware.TaskContext;

/**
* Wraps each task execution in a Micrometer {@link Observation}: one
* instrumentation that yields both metrics (a timer) and a trace span. Plug in
* OpenTelemetry (or any backend) by configuring the {@link ObservationRegistry}
* the application passes in.
*
* <p>The observation is created in {@code before}, made current for the handler
* (an open {@link Observation.Scope}), and stopped in {@code after}/{@code onError};
* state is carried on the per-task {@link TaskContext#attributes()}.
*/
public final class TaskitoObservation implements Middleware {
private static final String OBSERVATION = "taskito.contrib.observation";
private static final String SCOPE = "taskito.contrib.observation.scope";

private final ObservationRegistry registry;
private final String name;
private final Predicate<String> taskFilter;

public TaskitoObservation(ObservationRegistry registry) {
this(registry, "taskito.task", task -> true);
}

public TaskitoObservation(ObservationRegistry registry, String name, Predicate<String> taskFilter) {
this.registry = registry;
this.name = name;
this.taskFilter = taskFilter;
}

@Override
public void before(TaskContext context) {
if (!taskFilter.test(context.taskName)) {
return;
}
Observation observation = Observation.createNotStarted(name, registry)
.lowCardinalityKeyValue("taskito.task", context.taskName)
.start();
context.attributes().put(OBSERVATION, observation);
context.attributes().put(SCOPE, observation.openScope());
}

@Override
public void after(TaskContext context, Object result) {
stop(context, null);
}

@Override
public void onError(TaskContext context, Throwable error) {
stop(context, error);
}

private void stop(TaskContext context, Throwable error) {
Observation.Scope scope = (Observation.Scope) context.attributes().remove(SCOPE);
if (scope != null) {
scope.close();
}
Observation observation = (Observation) context.attributes().remove(OBSERVATION);
if (observation == null) {
return;
}
if (error != null) {
observation.error(error);
}
observation.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/**
* Optional observability middleware. Each integration's third-party dependency
* is {@code compileOnly}, so a consumer that uses one adds the matching runtime
* dependency to their build.
*
* <ul>
* <li>{@link org.byteveda.taskito.contrib.TaskitoObservation} — Micrometer
* Observation per task (one instrumentation yields metrics + a trace span;
* plug OpenTelemetry in as the backend).
* <li>{@link org.byteveda.taskito.contrib.SentryMiddleware} — report task
* failures and dead-letters to Sentry.
* </ul>
*/
package org.byteveda.taskito.contrib;
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.byteveda.taskito.contrib;

import static io.micrometer.observation.tck.TestObservationRegistryAssert.assertThat;

import io.micrometer.observation.tck.TestObservationRegistry;
import org.byteveda.taskito.middleware.TaskContext;
import org.junit.jupiter.api.Test;

class ObservationMiddlewareTest {

@Test
void recordsObservationOnSuccess() {
TestObservationRegistry registry = TestObservationRegistry.create();
TaskitoObservation middleware = new TaskitoObservation(registry);
TaskContext context = new TaskContext("job-1", "my.task");

middleware.before(context);
middleware.after(context, "ok");

assertThat(registry)
.hasObservationWithNameEqualTo("taskito.task")
.that()
.hasBeenStarted()
.hasBeenStopped()
.hasLowCardinalityKeyValue("taskito.task", "my.task");
}

@Test
void recordsErrorOnFailure() {
TestObservationRegistry registry = TestObservationRegistry.create();
TaskitoObservation middleware = new TaskitoObservation(registry);
TaskContext context = new TaskContext("job-2", "my.task");

middleware.before(context);
middleware.onError(context, new IllegalStateException("boom"));

assertThat(registry)
.hasObservationWithNameEqualTo("taskito.task")
.that()
.hasError();
}

@Test
void filterSkipsUnobservedTasks() {
TestObservationRegistry registry = TestObservationRegistry.create();
TaskitoObservation middleware =
new TaskitoObservation(registry, "taskito.task", task -> task.equals("included"));
TaskContext context = new TaskContext("job-3", "excluded");

middleware.before(context);
middleware.after(context, "ok");

assertThat(registry).doesNotHaveAnyObservation();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.byteveda.taskito.contrib;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

import org.byteveda.taskito.events.EventName;
import org.byteveda.taskito.events.OutcomeEvent;
import org.byteveda.taskito.middleware.TaskContext;
import org.junit.jupiter.api.Test;

class SentryMiddlewareTest {

@Test
void safeNoOpWhenSentryNotInitialized() {
SentryMiddleware middleware = new SentryMiddleware();
// With no Sentry.init(...), the hooks must not break the worker.
assertDoesNotThrow(() -> middleware.onError(new TaskContext("j", "t"), new RuntimeException("x")));
assertDoesNotThrow(() -> middleware.onDeadLetter(new OutcomeEvent(EventName.DEAD, "j", "t", "err", 0, false)));
}

@Test
void filterSkipsUnreportedTasks() {
SentryMiddleware middleware = new SentryMiddleware(task -> task.equals("included"));
assertDoesNotThrow(() -> middleware.onError(new TaskContext("j", "excluded"), new RuntimeException("x")));
}
}