diff --git a/pom.xml b/pom.xml
index 44c9b449bd..dd320dc72b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,7 +183,6 @@
1.62.0
- 1.41.1
true
_
@@ -394,21 +393,11 @@
opentelemetry-api
${opentelemetry.version}
-
- io.opentelemetry
- opentelemetry-sdk
- ${opentelemetry.version}
-
io.opentelemetry
opentelemetry-sdk-testing
${opentelemetry.version}
-
- io.opentelemetry.semconv
- opentelemetry-semconv
- ${opentelemetry-semconv.version}
-
io.opentelemetry
opentelemetry-context
diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml
index ba19c73e33..1d7a69fa58 100644
--- a/ratis-common/pom.xml
+++ b/ratis-common/pom.xml
@@ -46,17 +46,10 @@
io.opentelemetry
opentelemetry-context
-
- io.opentelemetry
- opentelemetry-sdk
-
io.opentelemetry
opentelemetry-sdk-testing
-
-
- io.opentelemetry.semconv
- opentelemetry-semconv
+ test
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/NoOpTraceProvider.java b/ratis-common/src/main/java/org/apache/ratis/trace/NoOpTraceProvider.java
new file mode 100644
index 0000000000..66451d5c00
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/NoOpTraceProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.function.CheckedSupplier;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+enum NoOpTraceProvider implements TraceProvider {
+ INSTANCE;
+
+ @Override
+ public CompletableFuture traceClientSend(
+ CheckedSupplier, THROWABLE> action,
+ RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE {
+ return action.get();
+ }
+
+ @Override
+ public CompletableFuture traceServerRequest(
+ CheckedSupplier, THROWABLE> action,
+ RaftClientRequest request, String memberId, String spanName) throws THROWABLE {
+ return action.get();
+ }
+
+ @Override
+ public CompletableFuture traceAppendEntries(
+ CheckedSupplier, IOException> action,
+ AppendEntriesRequestProto request, String memberId) throws IOException {
+ return action.get();
+ }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java
index e7145eb947..a4309f6a50 100644
--- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java
@@ -17,19 +17,14 @@
*/
package org.apache.ratis.trace;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanKind;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedSupplier;
import java.util.concurrent.CompletableFuture;
-/** Client-side OpenTelemetry helpers. */
+/** Client-side tracing helpers. */
public final class TraceClient {
- private static final String LEADER = "LEADER";
-
private TraceClient() {
}
@@ -39,25 +34,6 @@ private TraceClient() {
public static CompletableFuture asyncSend(
CheckedSupplier, THROWABLE> action,
RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE {
- if (!TraceUtils.isEnabled()) {
- return action.get();
- }
- return TraceUtils.traceAsyncMethod(action,
- () -> createClientOperationSpan(type, server, SpanNames.ASYNC_SEND));
- }
-
- private static Span createClientOperationSpan(RaftClientRequest.Type type, RaftPeerId server,
- String spanName) {
- Preconditions.assertNotNull(spanName, () -> "Span name cannot be null");
- Preconditions.assertTrue(!spanName.isEmpty(), "Span name should not be empty");
- String peerId = server == null ? LEADER : String.valueOf(server);
- final Span span = TraceUtils.getGlobalTracer()
- .spanBuilder(spanName)
- .setSpanKind(SpanKind.CLIENT)
- .startSpan();
- span.setAttribute(RatisAttributes.PEER_ID, peerId);
- span.setAttribute(RatisAttributes.OPERATION_NAME, spanName);
- span.setAttribute(RatisAttributes.OPERATION_TYPE, String.valueOf(type));
- return span;
+ return TraceUtils.getProvider().traceClientSend(action, type, server);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceProvider.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceProvider.java
new file mode 100644
index 0000000000..36b5b11d70
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.function.CheckedSupplier;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public interface TraceProvider {
+ CompletableFuture traceClientSend(
+ CheckedSupplier, THROWABLE> action,
+ RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE;
+
+ CompletableFuture traceServerRequest(
+ CheckedSupplier, THROWABLE> action,
+ RaftClientRequest request, String memberId, String spanName) throws THROWABLE;
+
+ CompletableFuture traceAppendEntries(
+ CheckedSupplier, IOException> action,
+ AppendEntriesRequestProto request, String memberId) throws IOException;
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java
index 35f8fef1a5..9d127dbe90 100644
--- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java
@@ -17,20 +17,14 @@
*/
package org.apache.ratis.trace;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.SpanKind;
-import io.opentelemetry.context.Context;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.proto.RaftProtos.SpanContextProto;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.function.CheckedSupplier;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-/** Server-side OpenTelemetry helpers. */
+/** Server-side tracing helpers. */
public final class TraceServer {
private TraceServer() {
}
@@ -41,25 +35,7 @@ private TraceServer() {
public static CompletableFuture traceAsyncMethod(
CheckedSupplier, THROWABLE> action,
RaftClientRequest request, String memberId, String spanName) throws THROWABLE {
- if (!TraceUtils.isEnabled()) {
- return action.get();
- }
- return TraceUtils.traceAsyncMethod(action,
- () -> createServerSpanFromClientRequest(request, memberId, spanName));
- }
-
- private static Span createServerSpanFromClientRequest(RaftClientRequest request, String memberId,
- String spanName) {
- final Context remoteContext = TraceUtils.extractContextFromProto(request.getSpanContext());
- final Span span = TraceUtils.getGlobalTracer()
- .spanBuilder(spanName)
- .setParent(remoteContext)
- .setSpanKind(SpanKind.SERVER)
- .startSpan();
- span.setAttribute(RatisAttributes.CLIENT_ID, String.valueOf(request.getClientId()));
- span.setAttribute(RatisAttributes.CALL_ID, String.valueOf(request.getCallId()));
- span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
- return span;
+ return TraceUtils.getProvider().traceServerRequest(action, request, memberId, spanName);
}
/**
@@ -69,26 +45,6 @@ private static Span createServerSpanFromClientRequest(RaftClientRequest request,
public static CompletableFuture traceAppendEntriesAsync(
CheckedSupplier, IOException> action,
AppendEntriesRequestProto request, String memberId) throws IOException {
- if (!TraceUtils.isEnabled()) {
- return action.get();
- }
- final RaftRpcRequestProto rpc = request.getServerRequest();
- final SpanContextProto spanContext = rpc.getSpanContext();
- // If the leader sent no parent span context, still trace as a root span
- // rather than skipping tracing entirely.
- final Context remoteContext = (spanContext == null || spanContext.getContextMap().isEmpty())
- ? Context.root()
- : TraceUtils.extractContextFromProto(spanContext);
- return TraceUtils.traceAsyncMethod(action, () -> {
- final Span span = TraceUtils.getGlobalTracer()
- .spanBuilder(SpanNames.APPEND_ENTRIES_ASYNC)
- .setParent(remoteContext)
- .setSpanKind(SpanKind.INTERNAL)
- .startSpan();
- span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
- span.setAttribute(RatisAttributes.PEER_ID, String.valueOf(RaftPeerId.valueOf(rpc.getRequestorId())));
- span.setAttribute(RatisAttributes.APPEND_ENTRIES_COUNT, (long) request.getEntriesCount());
- return span;
- });
+ return TraceUtils.getProvider().traceAppendEntries(action, request, memberId);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
index 3dc3e228f1..a164184ea8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java
@@ -17,46 +17,23 @@
*/
package org.apache.ratis.trace;
-import io.opentelemetry.api.GlobalOpenTelemetry;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.api.trace.StatusCode;
-import io.opentelemetry.api.trace.Tracer;
-import io.opentelemetry.context.Context;
-import io.opentelemetry.context.Scope;
-import io.opentelemetry.context.propagation.TextMapPropagator;
-import io.opentelemetry.context.propagation.TextMapGetter;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.SpanContextProto;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.function.CheckedSupplier;
-import org.apache.ratis.util.VersionInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.ratis.trace.opentelemetry.OpenTelemetryTraceProvider;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.Supplier;
-/** Common OpenTelemetry utilities shared by {@link TraceClient} and {@link TraceServer}. */
+/** Common tracing utilities shared by {@link TraceClient} and {@link TraceServer}. */
public final class TraceUtils {
- private static final AtomicReference TRACER = new AtomicReference<>();
- private static final Logger LOG = LoggerFactory.getLogger(TraceUtils.class);
+ private static final AtomicReference PROVIDER =
+ new AtomicReference<>(NoOpTraceProvider.INSTANCE);
private TraceUtils() {
}
- public static Tracer getGlobalTracer() {
- return TRACER.get();
- }
-
/**
- * Initializes the global tracer from configuration when tracing is enabled, or clears it when
- * disabled. Call from {@link org.apache.ratis.server.RaftServer} and
+ * Initializes tracing from configuration when tracing is enabled, or clears it when disabled.
+ * Call from {@link org.apache.ratis.server.RaftServer} and
* {@link org.apache.ratis.client.RaftClient} construction so tracing follows
* {@link TraceConfigKeys}.
*
@@ -67,130 +44,29 @@ public static void setTracerWhenEnabled(RaftProperties properties) {
}
/**
- * Enables or disables the tracer without reading {@link RaftProperties}. Intended for tests and
+ * Enables or disables tracing without reading {@link RaftProperties}. Intended for tests and
* simple toggles; production code should prefer {@link #setTracerWhenEnabled(RaftProperties)}.
*
- * @param enabled when true, lazily obtains the OpenTelemetry tracer; when false, clears it
+ * @param enabled when true, enables the OpenTelemetry provider; when false, clears it
*/
public static void setTracerWhenEnabled(boolean enabled) {
- if (enabled) {
- TRACER.updateAndGet(previous -> previous != null ? previous
- : GlobalOpenTelemetry.getTracer("org.apache.ratis", VersionInfo.getSoftwareInfoVersion()));
- } else {
- TRACER.set(null);
- }
+ PROVIDER.set(enabled ? newOpenTelemetryTraceProvider() : NoOpTraceProvider.INSTANCE);
}
public static boolean isEnabled() {
- return TRACER.get() != null;
+ return !(getProvider() instanceof NoOpTraceProvider);
}
- /**
- * Traces an asynchronous operation represented by a {@link CompletableFuture}. The returned future
- * completes with the same outcome as the supplied future; the span is ended when that future
- * completes.
- */
- static CompletableFuture traceAsyncMethod(
- CheckedSupplier, THROWABLE> action, Supplier spanSupplier) throws THROWABLE {
- final Span span = spanSupplier.get();
- try (Scope ignored = span.makeCurrent()) {
- final CompletableFuture future;
- try {
- future = action.get();
- } catch (RuntimeException | Error e) {
- setError(span, e);
- span.end();
- throw e;
- } catch (Throwable t) {
- setError(span, t);
- span.end();
- throw JavaUtils.cast(t);
- }
- endSpan(future, span);
- return future;
- }
+ static TraceProvider getProvider() {
+ return PROVIDER.get();
}
- private static void endSpan(CompletableFuture> future, Span span) {
- if (span == null) {
- LOG.debug("Span is null, cannot trace the future {}", future);
- return;
+ private static TraceProvider newOpenTelemetryTraceProvider() {
+ try {
+ return new OpenTelemetryTraceProvider();
+ } catch (Throwable e) {
+ throw new IllegalStateException(
+ "OpenTelemetry tracing is enabled but OpenTelemetry is not available; tracing is disabled", e);
}
- addListener(future, (resp, error) -> {
- try {
- if (error != null) {
- setError(span, error);
- } else {
- span.setStatus(StatusCode.OK);
- }
- } catch (Throwable t) {
- LOG.error("Error setting span status, ending span anyway", t);
- } finally {
- span.end();
- }
- });
}
-
- public static void setError(Span span, Throwable error) {
- span.recordException(error);
- span.setStatus(StatusCode.ERROR);
- }
-
- /**
- * This is method is used when you just want to add a listener to the given future. We will call
- * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
- * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
- * suppress exceptions thrown from the code that completes the future, and this method will catch
- * all the exception thrown from the {@code action} to catch possible code bugs.
- *
- * And the error phone check will always report FutureReturnValueIgnored because every method in
- * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
- * have one future that has not been checked. So we introduce this method and add a suppression
- * warnings annotation here.
- */
- @SuppressWarnings("FutureReturnValueIgnored")
- private static void addListener(CompletableFuture future,
- BiConsumer super T, ? super Throwable> action) {
- future.whenComplete((resp, error) -> {
- try {
- // https://s.apache.org/completionexception — unwrap CompletionException for callers
- action.accept(resp, error == null ? null : JavaUtils.unwrapCompletionException(error));
- } catch (Throwable t) {
- LOG.error("Unexpected error caught when processing CompletableFuture", t);
- }
- });
- }
-
- private static final TextMapPropagator PROPAGATOR =
- GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
-
- public static SpanContextProto injectContextToProto(Context context) {
- Map carrier = new TreeMap<>();
- PROPAGATOR.inject(context, carrier, (map, key, value) -> map.put(key, value));
- return SpanContextProto.newBuilder().putAllContext(carrier).build();
- }
-
- public static Context extractContextFromProto(SpanContextProto proto) {
- if (proto == null || proto.getContextMap().isEmpty()) {
- return Context.current();
- }
- final TextMapGetter getter = SpanContextGetter.INSTANCE;
- return PROPAGATOR.extract(Context.current(), proto, getter);
- }
-}
-
-class SpanContextGetter implements TextMapGetter {
- static final SpanContextGetter INSTANCE = new SpanContextGetter();
-
- @Override
- public Iterable keys(SpanContextProto carrier) {
- return carrier.getContextMap().keySet();
- }
-
- @Override
- public String get(SpanContextProto carrier, String key) {
- return Optional.ofNullable(carrier).map(SpanContextProto::getContextMap)
- .map(map -> map.get(key)).orElse(null);
- }
-
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceProvider.java b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceProvider.java
new file mode 100644
index 0000000000..a1059d9bf2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceProvider.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace.opentelemetry;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.SpanContextProto;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.trace.RatisAttributes;
+import org.apache.ratis.trace.SpanNames;
+import org.apache.ratis.trace.TraceProvider;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.VersionInfo;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+public final class OpenTelemetryTraceProvider implements TraceProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryTraceProvider.class);
+ private static final String LEADER = "LEADER";
+
+ private final Tracer tracer = Objects.requireNonNull(
+ GlobalOpenTelemetry.getTracer("org.apache.ratis", VersionInfo.getSoftwareInfoVersion()),
+ "tracer == null");
+
+ @Override
+ public CompletableFuture traceClientSend(
+ CheckedSupplier, THROWABLE> action,
+ RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE {
+ return traceAsyncMethod(action, () -> createClientOperationSpan(type, server, SpanNames.ASYNC_SEND));
+ }
+
+ @Override
+ public CompletableFuture traceServerRequest(
+ CheckedSupplier, THROWABLE> action,
+ RaftClientRequest request, String memberId, String spanName) throws THROWABLE {
+ return traceAsyncMethod(action, () -> createServerSpanFromClientRequest(request, memberId, spanName));
+ }
+
+ @Override
+ public CompletableFuture traceAppendEntries(
+ CheckedSupplier, IOException> action,
+ AppendEntriesRequestProto request, String memberId) throws IOException {
+ final RaftRpcRequestProto rpc = request.getServerRequest();
+ final SpanContextProto spanContext = rpc.getSpanContext();
+ final Context remoteContext = (spanContext == null || spanContext.getContextMap().isEmpty())
+ ? Context.root()
+ : OpenTelemetryTraceUtils.extractContextFromProto(spanContext);
+ return traceAsyncMethod(action, () -> {
+ final Span span = tracer.spanBuilder(SpanNames.APPEND_ENTRIES_ASYNC)
+ .setParent(remoteContext)
+ .setSpanKind(SpanKind.INTERNAL)
+ .startSpan();
+ span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
+ span.setAttribute(RatisAttributes.PEER_ID, String.valueOf(RaftPeerId.valueOf(rpc.getRequestorId())));
+ span.setAttribute(RatisAttributes.APPEND_ENTRIES_COUNT, (long) request.getEntriesCount());
+ return span;
+ });
+ }
+
+ private Span createClientOperationSpan(RaftClientRequest.Type type, RaftPeerId server, String spanName) {
+ Preconditions.assertNotNull(spanName, () -> "Span name cannot be null");
+ Preconditions.assertTrue(!spanName.isEmpty(), "Span name should not be empty");
+ final String peerId = server == null ? LEADER : String.valueOf(server);
+ final Span span = tracer.spanBuilder(spanName)
+ .setSpanKind(SpanKind.CLIENT)
+ .startSpan();
+ span.setAttribute(RatisAttributes.PEER_ID, peerId);
+ span.setAttribute(RatisAttributes.OPERATION_NAME, spanName);
+ span.setAttribute(RatisAttributes.OPERATION_TYPE, String.valueOf(type));
+ return span;
+ }
+
+ private Span createServerSpanFromClientRequest(RaftClientRequest request, String memberId, String spanName) {
+ final Context remoteContext = OpenTelemetryTraceUtils.extractContextFromProto(request.getSpanContext());
+ final Span span = tracer.spanBuilder(spanName)
+ .setParent(remoteContext)
+ .setSpanKind(SpanKind.SERVER)
+ .startSpan();
+ span.setAttribute(RatisAttributes.CLIENT_ID, String.valueOf(request.getClientId()));
+ span.setAttribute(RatisAttributes.CALL_ID, String.valueOf(request.getCallId()));
+ span.setAttribute(RatisAttributes.MEMBER_ID, memberId);
+ return span;
+ }
+
+ @SuppressWarnings("try")
+ private static CompletableFuture traceAsyncMethod(
+ CheckedSupplier, THROWABLE> action, Supplier spanSupplier) throws THROWABLE {
+ final Span span = spanSupplier.get();
+ try (Scope ignored = span.makeCurrent()) {
+ final CompletableFuture future;
+ try {
+ future = action.get();
+ } catch (RuntimeException | Error e) {
+ setError(span, e);
+ span.end();
+ throw e;
+ } catch (Throwable t) {
+ setError(span, t);
+ span.end();
+ throw JavaUtils.cast(t);
+ }
+ endSpan(future, span);
+ return future;
+ }
+ }
+
+ private static void endSpan(CompletableFuture> future, Span span) {
+ addListener(future, (resp, error) -> {
+ try {
+ if (error != null) {
+ setError(span, error);
+ } else {
+ span.setStatus(StatusCode.OK);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error setting span status, ending span anyway", t);
+ } finally {
+ span.end();
+ }
+ });
+ }
+
+ private static void setError(Span span, Throwable error) {
+ span.recordException(error);
+ span.setStatus(StatusCode.ERROR);
+ }
+
+ /**
+ * This is method is used when you just want to add a listener to the given future. We will call
+ * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
+ * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
+ * suppress exceptions thrown from the code that completes the future, and this method will catch
+ * all the exception thrown from the {@code action} to catch possible code bugs.
+ *
+ * And the error phone check will always report FutureReturnValueIgnored because every method in
+ * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
+ * have one future that has not been checked. So we introduce this method and add a suppression
+ * warnings annotation here.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private static void addListener(CompletableFuture future,
+ BiConsumer super T, ? super Throwable> action) {
+ future.whenComplete((resp, error) -> {
+ try {
+ action.accept(resp, error == null ? null : JavaUtils.unwrapCompletionException(error));
+ } catch (Throwable t) {
+ LOG.error("Unexpected error caught when processing CompletableFuture", t);
+ }
+ });
+ }
+
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceUtils.java b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceUtils.java
new file mode 100644
index 0000000000..b5f0c8c249
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.trace.opentelemetry;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.propagation.TextMapGetter;
+import io.opentelemetry.context.propagation.TextMapPropagator;
+import org.apache.ratis.proto.RaftProtos.SpanContextProto;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+
+/** OpenTelemetry-specific helpers. Callers using this class must provide OpenTelemetry jars. */
+public final class OpenTelemetryTraceUtils {
+ private OpenTelemetryTraceUtils() {
+ }
+
+ public static SpanContextProto injectContextToProto(Context context) {
+ final Map carrier = new TreeMap<>();
+ getTextMapPropagator().inject(context, carrier, (map, key, value) -> map.put(key, value));
+ return SpanContextProto.newBuilder().putAllContext(carrier).build();
+ }
+
+ static Context extractContextFromProto(SpanContextProto proto) {
+ if (proto == null || proto.getContextMap().isEmpty()) {
+ return Context.current();
+ }
+ return getTextMapPropagator().extract(Context.current(), proto, SpanContextGetter.INSTANCE);
+ }
+
+ private static TextMapPropagator getTextMapPropagator() {
+ return GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
+ }
+
+ private static final class SpanContextGetter implements TextMapGetter {
+ private static final SpanContextGetter INSTANCE = new SpanContextGetter();
+
+ @Override
+ public Iterable keys(SpanContextProto carrier) {
+ return carrier.getContextMap().keySet();
+ }
+
+ @Override
+ public String get(SpanContextProto carrier, String key) {
+ return Optional.ofNullable(carrier).map(SpanContextProto::getContextMap)
+ .map(map -> map.get(key)).orElse(null);
+ }
+ }
+}
diff --git a/ratis-server/pom.xml b/ratis-server/pom.xml
index 2c0bc93a25..74cd7c5462 100644
--- a/ratis-server/pom.xml
+++ b/ratis-server/pom.xml
@@ -84,6 +84,11 @@
mockito-core
test
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+
org.apache.ratis
ratis-metrics-api
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
index 81e9289f42..db0c466b73 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java
@@ -43,6 +43,7 @@
import org.apache.ratis.trace.TraceConfigKeys;
import org.apache.ratis.trace.TraceServer;
import org.apache.ratis.trace.TraceUtils;
+import org.apache.ratis.trace.opentelemetry.OpenTelemetryTraceUtils;
import org.apache.ratis.util.JavaUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -182,7 +183,7 @@ private static SpanContextProto injectedSpanContext() {
.setSpanKind(SpanKind.CLIENT)
.startSpan();
try {
- return TraceUtils.injectContextToProto(Context.current().with(remoteParent));
+ return OpenTelemetryTraceUtils.injectContextToProto(Context.current().with(remoteParent));
} finally {
remoteParent.end();
}
@@ -253,7 +254,7 @@ private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type typ
.setGroupId(RaftGroupId.randomId())
.setCallId(1L)
.setType(type)
- .setSpanContext(TraceUtils.injectContextToProto(clientContext))
+ .setSpanContext(OpenTelemetryTraceUtils.injectContextToProto(clientContext))
.build();
} finally {
clientSpan.end();
@@ -265,4 +266,3 @@ private long randomCallId() {
}
}
-
diff --git a/ratis-test/pom.xml b/ratis-test/pom.xml
index 577262d84c..54f076ac1a 100644
--- a/ratis-test/pom.xml
+++ b/ratis-test/pom.xml
@@ -156,5 +156,10 @@
mockito-core
test
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ test
+