Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/5155.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-exporter-otlp-proto-http`: Log server error details from response body on export failure
Original file line number Diff line number Diff line change
@@ -1,16 +1,73 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

import logging
from os import environ
from typing import Literal

import requests
from google.rpc.status_pb2 import Status

from opentelemetry.sdk.environment_variables import (
_OTEL_PYTHON_EXPORTER_OTLP_HTTP_CREDENTIAL_PROVIDER,
)
from opentelemetry.util._importlib_metadata import entry_points

_logger = logging.getLogger(__name__)

_CONTENT_TYPE_PROTOBUF = "application/x-protobuf"
_CONTENT_TYPE_JSON = "application/json"


def _parse_response_body(resp: requests.Response) -> str:
"""Parse an HTTP response body based on its Content-Type header.

Per the OTLP spec, error responses (4xx/5xx) use ``google.rpc.Status``
for protobuf bodies and the equivalent JSON representation.

Args:
resp: The HTTP response from the OTLP endpoint.

Returns:
A human-readable string describing the response body error details,
or ``resp.reason`` if the body is empty or cannot be parsed.
"""
if not resp.content:
return resp.reason

content_type = (
resp.headers.get("Content-Type", "").split(";", 1)[0].strip().lower()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we split on semi-colon, shouldn't we do it on comma ? https://requests.readthedocs.io/en/latest/user/quickstart/#response-headers -- mentions comma

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semicolon split is intentional here. Per https://www.rfc-editor.org/rfc/rfc9110#section-8.3.1, the Content-Type header uses semicolons to separate the media type from parameters like charset:

Content-Type: application/x-protobuf; charset=utf-8

I believe the comma behavior you linked is about how requests combines multiple values of the same header into a single string - that's a different concept. Since Content-Type only has one value with optional ;-delimited parameters, splitting on ; is correct to extract just the media type portion.

)

if content_type == _CONTENT_TYPE_PROTOBUF:
status = Status()
try:
status.ParseFromString(resp.content)
except Exception: # pylint: disable=broad-except
Comment thread
grvmishra788 marked this conversation as resolved.
_logger.debug(
"Failed to parse protobuf response body", exc_info=True
)
return resp.reason
return status.message or resp.reason

if content_type == _CONTENT_TYPE_JSON:
try:
body = resp.json()
except Exception: # pylint: disable=broad-except
_logger.debug("Failed to parse JSON response body", exc_info=True)
return resp.text or resp.reason
if isinstance(body, dict):
partial = body.get("partialSuccess")
if isinstance(partial, dict) and (
error_message := partial.get("errorMessage", "")
):
return error_message
# google.rpc.Status uses "message"
if rpc_message := body.get("message", ""):
return rpc_message

return resp.text.strip() or resp.reason


def _is_retryable(resp: requests.Response) -> bool:
if resp.status_code == 408:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from opentelemetry.exporter.otlp.proto.http._common import (
_is_retryable,
_load_session_from_envvar,
_parse_response_body,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.sdk._logs import ReadableLogRecord
Expand Down Expand Up @@ -215,7 +216,7 @@ def export(
retryable = isinstance(error, ConnectionError)
status_code = None
else:
reason = resp.reason
reason = _parse_response_body(resp)
retryable = _is_retryable(resp)
status_code = resp.status_code

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@
from opentelemetry.exporter.otlp.proto.http._common import (
_is_retryable,
_load_session_from_envvar,
_parse_response_body,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: F401
ExportMetricsServiceRequest,
ExportMetricsServiceResponse,
)
from opentelemetry.proto.common.v1.common_pb2 import ( # noqa: F401
AnyValue,
Expand Down Expand Up @@ -286,7 +288,7 @@ def _export_with_retries(
retryable = isinstance(error, ConnectionError)
status_code = None
else:
reason = resp.reason
reason = _parse_response_body(resp)
retryable = _is_retryable(resp)
status_code = resp.status_code

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from opentelemetry.exporter.otlp.proto.http._common import (
_is_retryable,
_load_session_from_envvar,
_parse_response_body,
)
from opentelemetry.metrics import MeterProvider
from opentelemetry.sdk.environment_variables import (
Expand Down Expand Up @@ -208,7 +209,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
retryable = isinstance(error, ConnectionError)
status_code = None
else:
reason = resp.reason
reason = _parse_response_body(resp)
retryable = _is_retryable(resp)
status_code = resp.status_code

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# pylint: disable=protected-access

import logging
import threading
import time
import unittest
Expand All @@ -11,6 +12,7 @@

import requests
from google.protobuf.json_format import MessageToDict
from google.rpc.status_pb2 import Status
from requests import Session
from requests.exceptions import ConnectionError
from requests.models import Response
Expand Down Expand Up @@ -74,6 +76,11 @@ def setUp(self):
self.meter_provider = MeterProvider(
metric_readers=[self.metric_reader]
)
# Reset DuplicateFilter state between tests so each test can log freely.
log_exporter_logger = logging.getLogger(
"opentelemetry.exporter.otlp.proto.http._log_exporter"
)
log_exporter_logger.filters.clear()

def test_constructor_default(self):
exporter = OTLPLogExporter()
Expand Down Expand Up @@ -656,6 +663,25 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post):

assert after - before < 0.2

@patch.object(Session, "post")
def test_error_response_with_protobuf_body(self, mock_post):
status = Status(code=3, message="invalid log data")
resp = Response()
resp.status_code = 400
resp.reason = "Bad Request"
resp._content = status.SerializeToString() # pylint: disable=protected-access
resp.headers["Content-Type"] = "application/x-protobuf"
mock_post.return_value = resp

exporter = OTLPLogExporter()
with self.assertLogs(level="ERROR") as logs:
result = exporter.export(self._get_sdk_log_data())

self.assertEqual(result, LogRecordExportResult.FAILURE)
self.assertTrue(
any("invalid log data" in r.message for r in logs.records)
)

def assert_standard_metric_attrs(self, attributes):
self.assertEqual(
attributes["otel.component.type"], "otlp_http_log_exporter"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

import json
import threading
import time
import unittest
from logging import WARNING
from unittest.mock import MagicMock, Mock, patch

import requests
from google.rpc.status_pb2 import Status
from requests import Session
from requests.exceptions import ConnectionError
from requests.models import Response
Expand Down Expand Up @@ -486,6 +488,44 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post):

assert after - before < 0.2

@patch.object(Session, "post")
def test_error_response_with_protobuf_body(self, mock_post):
status = Status(code=3, message="invalid span data")
resp = Response()
resp.status_code = 400
resp.reason = "Bad Request"
resp._content = status.SerializeToString() # pylint: disable=protected-access
resp.headers["Content-Type"] = "application/x-protobuf"
mock_post.return_value = resp

exporter = OTLPSpanExporter()
with self.assertLogs(level="ERROR") as logs:
result = exporter.export([BASIC_SPAN])

self.assertEqual(result, SpanExportResult.FAILURE)
self.assertTrue(
any("invalid span data" in r.message for r in logs.records)
)

@patch.object(Session, "post")
def test_error_response_with_json_body(self, mock_post):
body = json.dumps({"message": "quota limit reached"}).encode()
resp = Response()
resp.status_code = 400
resp.reason = "Bad Request"
resp._content = body # pylint: disable=protected-access
resp.headers["Content-Type"] = "application/json"
mock_post.return_value = resp

exporter = OTLPSpanExporter()
with self.assertLogs(level="ERROR") as logs:
result = exporter.export([BASIC_SPAN])

self.assertEqual(result, SpanExportResult.FAILURE)
self.assertTrue(
any("quota limit reached" in r.message for r in logs.records)
)

def assert_standard_metric_attrs(self, attributes):
self.assertEqual(
attributes["otel.component.type"], "otlp_http_span_exporter"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

import json
import unittest

from google.rpc.status_pb2 import Status
from requests.models import Response

from opentelemetry.exporter.otlp.proto.http._common import _parse_response_body


def _make_response(
content: bytes,
content_type: str,
reason: str = "Bad Request",
status_code: int = 400,
) -> Response:
resp = Response()
resp.status_code = status_code
resp.reason = reason
resp._content = content # pylint: disable=protected-access
resp.headers["Content-Type"] = content_type
return resp


class TestParseResponseBody(unittest.TestCase):
def test_protobuf_content_type_with_error_message(self):
status = Status(code=8, message="quota exceeded for project")
resp = _make_response(
content=status.SerializeToString(),
content_type="application/x-protobuf",
)
self.assertEqual(
_parse_response_body(resp),
"quota exceeded for project",
)

def test_protobuf_content_type_without_message_falls_back_to_reason(self):
status = Status(code=2)
resp = _make_response(
content=status.SerializeToString(),
content_type="application/x-protobuf",
reason="Bad Request",
)
self.assertEqual(
_parse_response_body(resp),
"Bad Request",
)

def test_protobuf_content_type_with_charset_parameter(self):
status = Status(code=8, message="quota exceeded")
resp = _make_response(
content=status.SerializeToString(),
content_type="application/x-protobuf; charset=utf-8",
)
self.assertEqual(
_parse_response_body(resp),
"quota exceeded",
)

def test_json_content_type_with_partial_success_error_message(self):
body = json.dumps(
{"partialSuccess": {"errorMessage": "rate limit exceeded"}}
).encode()
resp = _make_response(content=body, content_type="application/json")
self.assertEqual(
_parse_response_body(resp),
"rate limit exceeded",
)

def test_json_content_type_with_rpc_status_message(self):
body = json.dumps({"message": "permission denied"}).encode()
resp = _make_response(content=body, content_type="application/json")
self.assertEqual(
_parse_response_body(resp),
"permission denied",
)

def test_json_content_type_with_charset_parameter(self):
body = json.dumps({"message": "not authorized"}).encode()
resp = _make_response(
content=body, content_type="application/json; charset=utf-8"
)
self.assertEqual(
_parse_response_body(resp),
"not authorized",
)

def test_json_partial_success_null_falls_through(self):
body = json.dumps({"partialSuccess": None}).encode()
resp = _make_response(content=body, content_type="application/json")
self.assertEqual(
_parse_response_body(resp),
'{"partialSuccess": null}',
)

def test_json_partial_success_non_dict_falls_through(self):
body = json.dumps({"partialSuccess": "x"}).encode()
resp = _make_response(content=body, content_type="application/json")
self.assertEqual(
_parse_response_body(resp),
'{"partialSuccess": "x"}',
)

def test_unknown_content_type_returns_text(self):
resp = _make_response(
content=b"something went wrong",
content_type="text/plain",
)
self.assertEqual(
_parse_response_body(resp),
"something went wrong",
)

def test_empty_body_returns_reason(self):
resp = _make_response(
content=b"",
content_type="application/x-protobuf",
reason="Service Unavailable",
)
self.assertEqual(
_parse_response_body(resp),
"Service Unavailable",
)

def test_malformed_protobuf_body_falls_back_to_reason(self):
resp = _make_response(
content=b"\xff\xfe invalid protobuf",
content_type="application/x-protobuf",
reason="Bad Request",
)
self.assertEqual(
_parse_response_body(resp),
"Bad Request",
)

def test_malformed_json_body_falls_back_to_text(self):
resp = _make_response(
content=b"not valid json {{{",
content_type="application/json",
reason="Bad Request",
)
self.assertEqual(
_parse_response_body(resp),
"not valid json {{{",
)


if __name__ == "__main__":
unittest.main()
Loading