diff --git a/pom.xml b/pom.xml index 44c9b449bd..dd320dc72b 100644 --- a/pom.xml +++ b/pom.xml @@ -183,7 +183,6 @@ 1.62.0 - 1.41.1 true _ @@ -394,21 +393,11 @@ opentelemetry-api ${opentelemetry.version} - - io.opentelemetry - opentelemetry-sdk - ${opentelemetry.version} - io.opentelemetry opentelemetry-sdk-testing ${opentelemetry.version} - - io.opentelemetry.semconv - opentelemetry-semconv - ${opentelemetry-semconv.version} - io.opentelemetry opentelemetry-context diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml index ba19c73e33..1d7a69fa58 100644 --- a/ratis-common/pom.xml +++ b/ratis-common/pom.xml @@ -46,17 +46,10 @@ io.opentelemetry opentelemetry-context - - io.opentelemetry - opentelemetry-sdk - io.opentelemetry opentelemetry-sdk-testing - - - io.opentelemetry.semconv - opentelemetry-semconv + test diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/NoOpTraceProvider.java b/ratis-common/src/main/java/org/apache/ratis/trace/NoOpTraceProvider.java new file mode 100644 index 0000000000..66451d5c00 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/NoOpTraceProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace; + +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.function.CheckedSupplier; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +enum NoOpTraceProvider implements TraceProvider { + INSTANCE; + + @Override + public CompletableFuture traceClientSend( + CheckedSupplier, THROWABLE> action, + RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE { + return action.get(); + } + + @Override + public CompletableFuture traceServerRequest( + CheckedSupplier, THROWABLE> action, + RaftClientRequest request, String memberId, String spanName) throws THROWABLE { + return action.get(); + } + + @Override + public CompletableFuture traceAppendEntries( + CheckedSupplier, IOException> action, + AppendEntriesRequestProto request, String memberId) throws IOException { + return action.get(); + } +} diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java index e7145eb947..a4309f6a50 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceClient.java @@ -17,19 +17,14 @@ */ package org.apache.ratis.trace; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.function.CheckedSupplier; import java.util.concurrent.CompletableFuture; -/** Client-side OpenTelemetry helpers. */ +/** Client-side tracing helpers. */ public final class TraceClient { - private static final String LEADER = "LEADER"; - private TraceClient() { } @@ -39,25 +34,6 @@ private TraceClient() { public static CompletableFuture asyncSend( CheckedSupplier, THROWABLE> action, RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE { - if (!TraceUtils.isEnabled()) { - return action.get(); - } - return TraceUtils.traceAsyncMethod(action, - () -> createClientOperationSpan(type, server, SpanNames.ASYNC_SEND)); - } - - private static Span createClientOperationSpan(RaftClientRequest.Type type, RaftPeerId server, - String spanName) { - Preconditions.assertNotNull(spanName, () -> "Span name cannot be null"); - Preconditions.assertTrue(!spanName.isEmpty(), "Span name should not be empty"); - String peerId = server == null ? LEADER : String.valueOf(server); - final Span span = TraceUtils.getGlobalTracer() - .spanBuilder(spanName) - .setSpanKind(SpanKind.CLIENT) - .startSpan(); - span.setAttribute(RatisAttributes.PEER_ID, peerId); - span.setAttribute(RatisAttributes.OPERATION_NAME, spanName); - span.setAttribute(RatisAttributes.OPERATION_TYPE, String.valueOf(type)); - return span; + return TraceUtils.getProvider().traceClientSend(action, type, server); } } diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceProvider.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceProvider.java new file mode 100644 index 0000000000..36b5b11d70 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace; + +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.function.CheckedSupplier; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; + +public interface TraceProvider { + CompletableFuture traceClientSend( + CheckedSupplier, THROWABLE> action, + RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE; + + CompletableFuture traceServerRequest( + CheckedSupplier, THROWABLE> action, + RaftClientRequest request, String memberId, String spanName) throws THROWABLE; + + CompletableFuture traceAppendEntries( + CheckedSupplier, IOException> action, + AppendEntriesRequestProto request, String memberId) throws IOException; +} diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java index 35f8fef1a5..9d127dbe90 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceServer.java @@ -17,20 +17,14 @@ */ package org.apache.ratis.trace; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.context.Context; import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; -import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.function.CheckedSupplier; import java.io.IOException; import java.util.concurrent.CompletableFuture; -/** Server-side OpenTelemetry helpers. */ +/** Server-side tracing helpers. */ public final class TraceServer { private TraceServer() { } @@ -41,25 +35,7 @@ private TraceServer() { public static CompletableFuture traceAsyncMethod( CheckedSupplier, THROWABLE> action, RaftClientRequest request, String memberId, String spanName) throws THROWABLE { - if (!TraceUtils.isEnabled()) { - return action.get(); - } - return TraceUtils.traceAsyncMethod(action, - () -> createServerSpanFromClientRequest(request, memberId, spanName)); - } - - private static Span createServerSpanFromClientRequest(RaftClientRequest request, String memberId, - String spanName) { - final Context remoteContext = TraceUtils.extractContextFromProto(request.getSpanContext()); - final Span span = TraceUtils.getGlobalTracer() - .spanBuilder(spanName) - .setParent(remoteContext) - .setSpanKind(SpanKind.SERVER) - .startSpan(); - span.setAttribute(RatisAttributes.CLIENT_ID, String.valueOf(request.getClientId())); - span.setAttribute(RatisAttributes.CALL_ID, String.valueOf(request.getCallId())); - span.setAttribute(RatisAttributes.MEMBER_ID, memberId); - return span; + return TraceUtils.getProvider().traceServerRequest(action, request, memberId, spanName); } /** @@ -69,26 +45,6 @@ private static Span createServerSpanFromClientRequest(RaftClientRequest request, public static CompletableFuture traceAppendEntriesAsync( CheckedSupplier, IOException> action, AppendEntriesRequestProto request, String memberId) throws IOException { - if (!TraceUtils.isEnabled()) { - return action.get(); - } - final RaftRpcRequestProto rpc = request.getServerRequest(); - final SpanContextProto spanContext = rpc.getSpanContext(); - // If the leader sent no parent span context, still trace as a root span - // rather than skipping tracing entirely. - final Context remoteContext = (spanContext == null || spanContext.getContextMap().isEmpty()) - ? Context.root() - : TraceUtils.extractContextFromProto(spanContext); - return TraceUtils.traceAsyncMethod(action, () -> { - final Span span = TraceUtils.getGlobalTracer() - .spanBuilder(SpanNames.APPEND_ENTRIES_ASYNC) - .setParent(remoteContext) - .setSpanKind(SpanKind.INTERNAL) - .startSpan(); - span.setAttribute(RatisAttributes.MEMBER_ID, memberId); - span.setAttribute(RatisAttributes.PEER_ID, String.valueOf(RaftPeerId.valueOf(rpc.getRequestorId()))); - span.setAttribute(RatisAttributes.APPEND_ENTRIES_COUNT, (long) request.getEntriesCount()); - return span; - }); + return TraceUtils.getProvider().traceAppendEntries(action, request, memberId); } } diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java index 3dc3e228f1..a164184ea8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java @@ -17,46 +17,23 @@ */ package org.apache.ratis.trace; -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.context.propagation.TextMapGetter; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.proto.RaftProtos.SpanContextProto; -import org.apache.ratis.util.JavaUtils; -import org.apache.ratis.util.function.CheckedSupplier; -import org.apache.ratis.util.VersionInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.ratis.trace.opentelemetry.OpenTelemetryTraceProvider; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.function.Supplier; -/** Common OpenTelemetry utilities shared by {@link TraceClient} and {@link TraceServer}. */ +/** Common tracing utilities shared by {@link TraceClient} and {@link TraceServer}. */ public final class TraceUtils { - private static final AtomicReference TRACER = new AtomicReference<>(); - private static final Logger LOG = LoggerFactory.getLogger(TraceUtils.class); + private static final AtomicReference PROVIDER = + new AtomicReference<>(NoOpTraceProvider.INSTANCE); private TraceUtils() { } - public static Tracer getGlobalTracer() { - return TRACER.get(); - } - /** - * Initializes the global tracer from configuration when tracing is enabled, or clears it when - * disabled. Call from {@link org.apache.ratis.server.RaftServer} and + * Initializes tracing from configuration when tracing is enabled, or clears it when disabled. + * Call from {@link org.apache.ratis.server.RaftServer} and * {@link org.apache.ratis.client.RaftClient} construction so tracing follows * {@link TraceConfigKeys}. * @@ -67,130 +44,29 @@ public static void setTracerWhenEnabled(RaftProperties properties) { } /** - * Enables or disables the tracer without reading {@link RaftProperties}. Intended for tests and + * Enables or disables tracing without reading {@link RaftProperties}. Intended for tests and * simple toggles; production code should prefer {@link #setTracerWhenEnabled(RaftProperties)}. * - * @param enabled when true, lazily obtains the OpenTelemetry tracer; when false, clears it + * @param enabled when true, enables the OpenTelemetry provider; when false, clears it */ public static void setTracerWhenEnabled(boolean enabled) { - if (enabled) { - TRACER.updateAndGet(previous -> previous != null ? previous - : GlobalOpenTelemetry.getTracer("org.apache.ratis", VersionInfo.getSoftwareInfoVersion())); - } else { - TRACER.set(null); - } + PROVIDER.set(enabled ? newOpenTelemetryTraceProvider() : NoOpTraceProvider.INSTANCE); } public static boolean isEnabled() { - return TRACER.get() != null; + return !(getProvider() instanceof NoOpTraceProvider); } - /** - * Traces an asynchronous operation represented by a {@link CompletableFuture}. The returned future - * completes with the same outcome as the supplied future; the span is ended when that future - * completes. - */ - static CompletableFuture traceAsyncMethod( - CheckedSupplier, THROWABLE> action, Supplier spanSupplier) throws THROWABLE { - final Span span = spanSupplier.get(); - try (Scope ignored = span.makeCurrent()) { - final CompletableFuture future; - try { - future = action.get(); - } catch (RuntimeException | Error e) { - setError(span, e); - span.end(); - throw e; - } catch (Throwable t) { - setError(span, t); - span.end(); - throw JavaUtils.cast(t); - } - endSpan(future, span); - return future; - } + static TraceProvider getProvider() { + return PROVIDER.get(); } - private static void endSpan(CompletableFuture future, Span span) { - if (span == null) { - LOG.debug("Span is null, cannot trace the future {}", future); - return; + private static TraceProvider newOpenTelemetryTraceProvider() { + try { + return new OpenTelemetryTraceProvider(); + } catch (Throwable e) { + throw new IllegalStateException( + "OpenTelemetry tracing is enabled but OpenTelemetry is not available; tracing is disabled", e); } - addListener(future, (resp, error) -> { - try { - if (error != null) { - setError(span, error); - } else { - span.setStatus(StatusCode.OK); - } - } catch (Throwable t) { - LOG.error("Error setting span status, ending span anyway", t); - } finally { - span.end(); - } - }); } - - public static void setError(Span span, Throwable error) { - span.recordException(error); - span.setStatus(StatusCode.ERROR); - } - - /** - * This is method is used when you just want to add a listener to the given future. We will call - * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the - * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may - * suppress exceptions thrown from the code that completes the future, and this method will catch - * all the exception thrown from the {@code action} to catch possible code bugs. - *

- * And the error phone check will always report FutureReturnValueIgnored because every method in - * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always - * have one future that has not been checked. So we introduce this method and add a suppression - * warnings annotation here. - */ - @SuppressWarnings("FutureReturnValueIgnored") - private static void addListener(CompletableFuture future, - BiConsumer action) { - future.whenComplete((resp, error) -> { - try { - // https://s.apache.org/completionexception — unwrap CompletionException for callers - action.accept(resp, error == null ? null : JavaUtils.unwrapCompletionException(error)); - } catch (Throwable t) { - LOG.error("Unexpected error caught when processing CompletableFuture", t); - } - }); - } - - private static final TextMapPropagator PROPAGATOR = - GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); - - public static SpanContextProto injectContextToProto(Context context) { - Map carrier = new TreeMap<>(); - PROPAGATOR.inject(context, carrier, (map, key, value) -> map.put(key, value)); - return SpanContextProto.newBuilder().putAllContext(carrier).build(); - } - - public static Context extractContextFromProto(SpanContextProto proto) { - if (proto == null || proto.getContextMap().isEmpty()) { - return Context.current(); - } - final TextMapGetter getter = SpanContextGetter.INSTANCE; - return PROPAGATOR.extract(Context.current(), proto, getter); - } -} - -class SpanContextGetter implements TextMapGetter { - static final SpanContextGetter INSTANCE = new SpanContextGetter(); - - @Override - public Iterable keys(SpanContextProto carrier) { - return carrier.getContextMap().keySet(); - } - - @Override - public String get(SpanContextProto carrier, String key) { - return Optional.ofNullable(carrier).map(SpanContextProto::getContextMap) - .map(map -> map.get(key)).orElse(null); - } - } diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceProvider.java b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceProvider.java new file mode 100644 index 0000000000..a1059d9bf2 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceProvider.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace.opentelemetry; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; +import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.trace.RatisAttributes; +import org.apache.ratis.trace.SpanNames; +import org.apache.ratis.trace.TraceProvider; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.VersionInfo; +import org.apache.ratis.util.function.CheckedSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Supplier; + +public final class OpenTelemetryTraceProvider implements TraceProvider { + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryTraceProvider.class); + private static final String LEADER = "LEADER"; + + private final Tracer tracer = Objects.requireNonNull( + GlobalOpenTelemetry.getTracer("org.apache.ratis", VersionInfo.getSoftwareInfoVersion()), + "tracer == null"); + + @Override + public CompletableFuture traceClientSend( + CheckedSupplier, THROWABLE> action, + RaftClientRequest.Type type, RaftPeerId server) throws THROWABLE { + return traceAsyncMethod(action, () -> createClientOperationSpan(type, server, SpanNames.ASYNC_SEND)); + } + + @Override + public CompletableFuture traceServerRequest( + CheckedSupplier, THROWABLE> action, + RaftClientRequest request, String memberId, String spanName) throws THROWABLE { + return traceAsyncMethod(action, () -> createServerSpanFromClientRequest(request, memberId, spanName)); + } + + @Override + public CompletableFuture traceAppendEntries( + CheckedSupplier, IOException> action, + AppendEntriesRequestProto request, String memberId) throws IOException { + final RaftRpcRequestProto rpc = request.getServerRequest(); + final SpanContextProto spanContext = rpc.getSpanContext(); + final Context remoteContext = (spanContext == null || spanContext.getContextMap().isEmpty()) + ? Context.root() + : OpenTelemetryTraceUtils.extractContextFromProto(spanContext); + return traceAsyncMethod(action, () -> { + final Span span = tracer.spanBuilder(SpanNames.APPEND_ENTRIES_ASYNC) + .setParent(remoteContext) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + span.setAttribute(RatisAttributes.MEMBER_ID, memberId); + span.setAttribute(RatisAttributes.PEER_ID, String.valueOf(RaftPeerId.valueOf(rpc.getRequestorId()))); + span.setAttribute(RatisAttributes.APPEND_ENTRIES_COUNT, (long) request.getEntriesCount()); + return span; + }); + } + + private Span createClientOperationSpan(RaftClientRequest.Type type, RaftPeerId server, String spanName) { + Preconditions.assertNotNull(spanName, () -> "Span name cannot be null"); + Preconditions.assertTrue(!spanName.isEmpty(), "Span name should not be empty"); + final String peerId = server == null ? LEADER : String.valueOf(server); + final Span span = tracer.spanBuilder(spanName) + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + span.setAttribute(RatisAttributes.PEER_ID, peerId); + span.setAttribute(RatisAttributes.OPERATION_NAME, spanName); + span.setAttribute(RatisAttributes.OPERATION_TYPE, String.valueOf(type)); + return span; + } + + private Span createServerSpanFromClientRequest(RaftClientRequest request, String memberId, String spanName) { + final Context remoteContext = OpenTelemetryTraceUtils.extractContextFromProto(request.getSpanContext()); + final Span span = tracer.spanBuilder(spanName) + .setParent(remoteContext) + .setSpanKind(SpanKind.SERVER) + .startSpan(); + span.setAttribute(RatisAttributes.CLIENT_ID, String.valueOf(request.getClientId())); + span.setAttribute(RatisAttributes.CALL_ID, String.valueOf(request.getCallId())); + span.setAttribute(RatisAttributes.MEMBER_ID, memberId); + return span; + } + + @SuppressWarnings("try") + private static CompletableFuture traceAsyncMethod( + CheckedSupplier, THROWABLE> action, Supplier spanSupplier) throws THROWABLE { + final Span span = spanSupplier.get(); + try (Scope ignored = span.makeCurrent()) { + final CompletableFuture future; + try { + future = action.get(); + } catch (RuntimeException | Error e) { + setError(span, e); + span.end(); + throw e; + } catch (Throwable t) { + setError(span, t); + span.end(); + throw JavaUtils.cast(t); + } + endSpan(future, span); + return future; + } + } + + private static void endSpan(CompletableFuture future, Span span) { + addListener(future, (resp, error) -> { + try { + if (error != null) { + setError(span, error); + } else { + span.setStatus(StatusCode.OK); + } + } catch (Throwable t) { + LOG.error("Error setting span status, ending span anyway", t); + } finally { + span.end(); + } + }); + } + + private static void setError(Span span, Throwable error) { + span.recordException(error); + span.setStatus(StatusCode.ERROR); + } + + /** + * This is method is used when you just want to add a listener to the given future. We will call + * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the + * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may + * suppress exceptions thrown from the code that completes the future, and this method will catch + * all the exception thrown from the {@code action} to catch possible code bugs. + *

+ * And the error phone check will always report FutureReturnValueIgnored because every method in + * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always + * have one future that has not been checked. So we introduce this method and add a suppression + * warnings annotation here. + */ + @SuppressWarnings("FutureReturnValueIgnored") + private static void addListener(CompletableFuture future, + BiConsumer action) { + future.whenComplete((resp, error) -> { + try { + action.accept(resp, error == null ? null : JavaUtils.unwrapCompletionException(error)); + } catch (Throwable t) { + LOG.error("Unexpected error caught when processing CompletableFuture", t); + } + }); + } + +} diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceUtils.java b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceUtils.java new file mode 100644 index 0000000000..b5f0c8c249 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/opentelemetry/OpenTelemetryTraceUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace.opentelemetry; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapPropagator; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; + +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; + +/** OpenTelemetry-specific helpers. Callers using this class must provide OpenTelemetry jars. */ +public final class OpenTelemetryTraceUtils { + private OpenTelemetryTraceUtils() { + } + + public static SpanContextProto injectContextToProto(Context context) { + final Map carrier = new TreeMap<>(); + getTextMapPropagator().inject(context, carrier, (map, key, value) -> map.put(key, value)); + return SpanContextProto.newBuilder().putAllContext(carrier).build(); + } + + static Context extractContextFromProto(SpanContextProto proto) { + if (proto == null || proto.getContextMap().isEmpty()) { + return Context.current(); + } + return getTextMapPropagator().extract(Context.current(), proto, SpanContextGetter.INSTANCE); + } + + private static TextMapPropagator getTextMapPropagator() { + return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + } + + private static final class SpanContextGetter implements TextMapGetter { + private static final SpanContextGetter INSTANCE = new SpanContextGetter(); + + @Override + public Iterable keys(SpanContextProto carrier) { + return carrier.getContextMap().keySet(); + } + + @Override + public String get(SpanContextProto carrier, String key) { + return Optional.ofNullable(carrier).map(SpanContextProto::getContextMap) + .map(map -> map.get(key)).orElse(null); + } + } +} diff --git a/ratis-server/pom.xml b/ratis-server/pom.xml index 2c0bc93a25..74cd7c5462 100644 --- a/ratis-server/pom.xml +++ b/ratis-server/pom.xml @@ -84,6 +84,11 @@ mockito-core test + + io.opentelemetry + opentelemetry-sdk-testing + test + org.apache.ratis ratis-metrics-api diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java index 81e9289f42..db0c466b73 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java @@ -43,6 +43,7 @@ import org.apache.ratis.trace.TraceConfigKeys; import org.apache.ratis.trace.TraceServer; import org.apache.ratis.trace.TraceUtils; +import org.apache.ratis.trace.opentelemetry.OpenTelemetryTraceUtils; import org.apache.ratis.util.JavaUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -182,7 +183,7 @@ private static SpanContextProto injectedSpanContext() { .setSpanKind(SpanKind.CLIENT) .startSpan(); try { - return TraceUtils.injectContextToProto(Context.current().with(remoteParent)); + return OpenTelemetryTraceUtils.injectContextToProto(Context.current().with(remoteParent)); } finally { remoteParent.end(); } @@ -253,7 +254,7 @@ private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type typ .setGroupId(RaftGroupId.randomId()) .setCallId(1L) .setType(type) - .setSpanContext(TraceUtils.injectContextToProto(clientContext)) + .setSpanContext(OpenTelemetryTraceUtils.injectContextToProto(clientContext)) .build(); } finally { clientSpan.end(); @@ -265,4 +266,3 @@ private long randomCallId() { } } - diff --git a/ratis-test/pom.xml b/ratis-test/pom.xml index 577262d84c..54f076ac1a 100644 --- a/ratis-test/pom.xml +++ b/ratis-test/pom.xml @@ -156,5 +156,10 @@ mockito-core test + + io.opentelemetry + opentelemetry-sdk-testing + test +