Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features

- Add `sentry-kafka` module for Kafka queue instrumentation without Spring ([#5288](https://github.com/getsentry/sentry-java/pull/5288))
- Add Kafka queue tracing for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)), ([#5255](https://github.com/getsentry/sentry-java/pull/5255)), ([#5256](https://github.com/getsentry/sentry-java/pull/5256))
- Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250))
- Prevent cross-organization trace continuation ([#5136](https://github.com/getsentry/sentry-java/pull/5136))
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Sentry SDK for Java and Android
| sentry | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry?style=for-the-badge&logo=sentry&color=green) | 21 |
| sentry-jul | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jul?style=for-the-badge&logo=sentry&color=green) |
| sentry-jdbc | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-jdbc?style=for-the-badge&logo=sentry&color=green) |
| sentry-kafka | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-kafka?style=for-the-badge&logo=sentry&color=green) |
| sentry-apollo | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo?style=for-the-badge&logo=sentry&color=green) | 21 |
| sentry-apollo-3 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-3?style=for-the-badge&logo=sentry&color=green) | 21 |
| sentry-apollo-4 | ![Maven Central Version](https://img.shields.io/maven-central/v/io.sentry/sentry-apollo-4?style=for-the-badge&logo=sentry&color=green) | 21 |
Expand Down
1 change: 1 addition & 0 deletions buildSrc/src/main/java/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ object Config {
val SENTRY_JCACHE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jcache"
val SENTRY_QUARTZ_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.quartz"
val SENTRY_JDBC_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.jdbc"
val SENTRY_KAFKA_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.kafka"
val SENTRY_OPENFEATURE_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.openfeature"
val SENTRY_LAUNCHDARKLY_SERVER_SDK_NAME = "$SENTRY_JAVA_SDK_NAME.launchdarkly-server"
val SENTRY_LAUNCHDARKLY_ANDROID_SDK_NAME = "$SENTRY_ANDROID_SDK_NAME.launchdarkly"
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ springboot3-starter-jdbc = { module = "org.springframework.boot:spring-boot-star
springboot3-starter-actuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springboot3" }
springboot3-starter-cache = { module = "org.springframework.boot:spring-boot-starter-cache", version.ref = "springboot3" }
spring-kafka3 = { module = "org.springframework.kafka:spring-kafka", version = "3.3.5" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version = "3.8.1" }
springboot4-otel = { module = "io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter", version.ref = "otelInstrumentation" }
springboot4-resttestclient = { module = "org.springframework.boot:spring-boot-resttestclient", version.ref = "springboot4" }
springboot4-starter = { module = "org.springframework.boot:spring-boot-starter", version.ref = "springboot4" }
Expand Down
5 changes: 5 additions & 0 deletions sentry-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# sentry-kafka

This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly.

Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks.
25 changes: 25 additions & 0 deletions sentry-kafka/api/sentry-kafka.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
public final class io/sentry/kafka/BuildConfig {
public static final field SENTRY_KAFKA_SDK_NAME Ljava/lang/String;
public static final field VERSION_NAME Ljava/lang/String;
}

public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor {
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> (Lio/sentry/IScopes;)V
public fun close ()V
public fun configure (Ljava/util/Map;)V
public fun onCommit (Ljava/util/Map;)V
public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
}

public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> (Lio/sentry/IScopes;)V
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
public fun close ()V
public fun configure (Ljava/util/Map;)V
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
}

83 changes: 83 additions & 0 deletions sentry-kafka/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import net.ltgt.gradle.errorprone.errorprone
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
`java-library`
id("io.sentry.javadoc")
alias(libs.plugins.kotlin.jvm)
jacoco
alias(libs.plugins.errorprone)
alias(libs.plugins.gradle.versions)
alias(libs.plugins.buildconfig)
}

tasks.withType<KotlinCompile>().configureEach {
compilerOptions.jvmTarget = org.jetbrains.kotlin.gradle.dsl.JvmTarget.JVM_1_8
}

dependencies {
api(projects.sentry)
compileOnly(libs.kafka.clients)
compileOnly(libs.jetbrains.annotations)
compileOnly(libs.nopen.annotations)

errorprone(libs.errorprone.core)
errorprone(libs.nopen.checker)
errorprone(libs.nullaway)

// tests
testImplementation(projects.sentryTestSupport)
testImplementation(kotlin(Config.kotlinStdLib))
testImplementation(libs.kotlin.test.junit)
testImplementation(libs.mockito.kotlin)
testImplementation(libs.mockito.inline)
testImplementation(libs.kafka.clients)
}

configure<SourceSetContainer> { test { java.srcDir("src/test/java") } }

jacoco { toolVersion = libs.versions.jacoco.get() }

tasks.jacocoTestReport {
reports {
xml.required.set(true)
html.required.set(false)
}
}

tasks {
jacocoTestCoverageVerification {
violationRules { rule { limit { minimum = Config.QualityPlugins.Jacoco.minimumCoverage } } }
}
check {
dependsOn(jacocoTestCoverageVerification)
dependsOn(jacocoTestReport)
}
}

tasks.withType<JavaCompile>().configureEach {
options.errorprone {
check("NullAway", net.ltgt.gradle.errorprone.CheckSeverity.ERROR)
option("NullAway:AnnotatedPackages", "io.sentry")
}
}

buildConfig {
useJavaOutput()
packageName("io.sentry.kafka")
buildConfigField("String", "SENTRY_KAFKA_SDK_NAME", "\"${Config.Sentry.SENTRY_KAFKA_SDK_NAME}\"")
buildConfigField("String", "VERSION_NAME", "\"${project.version}\"")
}

tasks.jar {
manifest {
attributes(
"Sentry-Version-Name" to project.version,
"Sentry-SDK-Name" to Config.Sentry.SENTRY_KAFKA_SDK_NAME,
"Sentry-SDK-Package-Name" to "maven:io.sentry:sentry-kafka",
"Implementation-Vendor" to "Sentry",
"Implementation-Title" to project.name,
"Implementation-Version" to project.version,
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.sentry.kafka;

import io.sentry.BaggageHeader;
import io.sentry.IScopes;
import io.sentry.ITransaction;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanStatus;
import io.sentry.TransactionContext;
import io.sentry.TransactionOptions;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@ApiStatus.Internal
public final class SentryKafkaConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {

public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer";

private final @NotNull IScopes scopes;

public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) {
this.scopes = scopes;
}

@Override
public @NotNull ConsumerRecords<K, V> onConsume(final @NotNull ConsumerRecords<K, V> records) {
if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) {
return records;
}

final @NotNull ConsumerRecord<K, V> firstRecord = records.iterator().next();

try {
final @Nullable TransactionContext continued = continueTrace(firstRecord);
final @NotNull TransactionContext txContext =
continued != null ? continued : new TransactionContext("queue.receive", "queue.receive");
txContext.setName("queue.receive");
txContext.setOperation("queue.receive");

final @NotNull TransactionOptions txOptions = new TransactionOptions();
txOptions.setOrigin(TRACE_ORIGIN);
txOptions.setBindToScope(false);

final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions);
if (!transaction.isNoOp()) {
transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic());
transaction.setData("messaging.batch.message.count", records.count());
transaction.setStatus(SpanStatus.OK);
transaction.finish();
}
} catch (Throwable ignored) {
// Instrumentation must never break the customer's Kafka poll loop.
}

return records;
}

@Override
public void onCommit(final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets) {}

@Override
public void close() {}

@Override
public void configure(final @Nullable Map<String, ?> configs) {}

private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord<K, V> record) {
final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER);
final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER);
final @Nullable List<String> baggageHeaders =
baggage != null ? Collections.singletonList(baggage) : null;
return scopes.continueTrace(sentryTrace, baggageHeaders);
}

private @Nullable String headerValue(
final @NotNull ConsumerRecord<K, V> record, final @NotNull String headerName) {
final @Nullable Header header = record.headers().lastHeader(headerName);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.sentry.spring.jakarta.kafka;
package io.sentry.kafka;

import io.sentry.BaggageHeader;
import io.sentry.DateUtils;
Expand All @@ -19,28 +19,23 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* A Kafka {@link ProducerInterceptor} that creates {@code queue.publish} spans and injects tracing
* headers into outgoing records.
*
* <p>The span starts and finishes synchronously in {@link #onSend(ProducerRecord)}, representing
* "message enqueued" semantics. This avoids cross-thread correlation complexity since {@link
* #onAcknowledgement(RecordMetadata, Exception)} runs on the Kafka I/O thread.
*
* <p>If the customer already has a {@link ProducerInterceptor}, the {@link
* SentryKafkaProducerBeanPostProcessor} composes both using Spring's {@link
* org.springframework.kafka.support.CompositeProducerInterceptor}.
*/
@ApiStatus.Internal
public final class SentryProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
public final class SentryKafkaProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {

static final String TRACE_ORIGIN = "auto.queue.spring_jakarta.kafka.producer";
static final String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";
public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";

private final @NotNull IScopes scopes;
private final @NotNull String traceOrigin;

public SentryProducerInterceptor(final @NotNull IScopes scopes) {
public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) {
this(scopes, TRACE_ORIGIN);
}

public SentryKafkaProducerInterceptor(
final @NotNull IScopes scopes, final @NotNull String traceOrigin) {
this.scopes = scopes;
this.traceOrigin = traceOrigin;
}

@Override
Expand All @@ -56,7 +51,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) {

try {
final @NotNull SpanOptions spanOptions = new SpanOptions();
spanOptions.setOrigin(TRACE_ORIGIN);
spanOptions.setOrigin(traceOrigin);
final @NotNull ISpan span =
activeSpan.startChild("queue.publish", record.topic(), spanOptions);
if (span.isNoOp()) {
Expand All @@ -71,7 +66,7 @@ public SentryProducerInterceptor(final @NotNull IScopes scopes) {
span.setStatus(SpanStatus.OK);
span.finish();
} catch (Throwable ignored) {
// Instrumentation must never break the customer's Kafka send
// Instrumentation must never break the customer's Kafka send.
}

return record;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.sentry.kafka

import io.sentry.IScopes
import io.sentry.ITransaction
import io.sentry.SentryOptions
import io.sentry.TransactionContext
import io.sentry.TransactionOptions
import kotlin.test.Test
import kotlin.test.assertSame
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever

class SentryKafkaConsumerInterceptorTest {

@Test
fun `does nothing when queue tracing is disabled`() {
val scopes = mock<IScopes>()
val options = SentryOptions().apply { isEnableQueueTracing = false }
whenever(scopes.options).thenReturn(options)

val interceptor = SentryKafkaConsumerInterceptor<String, String>(scopes)
val records = singleRecordBatch()

val result = interceptor.onConsume(records)

assertSame(records, result)
verify(scopes, never()).startTransaction(any<TransactionContext>(), any<TransactionOptions>())
}

@Test
fun `starts and finishes queue receive transaction for consumed batch`() {
val scopes = mock<IScopes>()
val options = SentryOptions().apply { isEnableQueueTracing = true }
val transaction = mock<ITransaction>()

whenever(scopes.options).thenReturn(options)
whenever(scopes.continueTrace(any(), any())).thenReturn(null)
whenever(scopes.startTransaction(any<TransactionContext>(), any<TransactionOptions>()))
.thenReturn(transaction)
whenever(transaction.isNoOp).thenReturn(false)

val interceptor = SentryKafkaConsumerInterceptor<String, String>(scopes)

interceptor.onConsume(singleRecordBatch())

verify(scopes).startTransaction(any<TransactionContext>(), any<TransactionOptions>())
verify(transaction).setData("messaging.system", "kafka")
verify(transaction).setData("messaging.destination.name", "my-topic")
verify(transaction).setData("messaging.batch.message.count", 1)
verify(transaction).finish()
}

@Test
fun `commit callback is no-op`() {
val interceptor = SentryKafkaConsumerInterceptor<String, String>(mock())

interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1)))
}

private fun singleRecordBatch(): ConsumerRecords<String, String> {
val partition = TopicPartition("my-topic", 0)
val record = ConsumerRecord("my-topic", 0, 0L, "key", "value")
return ConsumerRecords(mapOf(partition to listOf(record)))
}
}
Loading
Loading