diff --git a/CHANGELOG.md b/CHANGELOG.md index b68a917818..76e4e2d1ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,9 @@ Increment the: * [CODE HEALTH] Remove last unused nostd namespace alias in otlp_populate [#4114](https://github.com/open-telemetry/opentelemetry-cpp/pull/4114) +* [EXPORTER] Handle OTLP partial success response + [#4104](https://github.com/open-telemetry/opentelemetry-cpp/pull/4104) + ## [1.27.0] 2026-05-13 * [RELEASE] Bump main branch to 1.27.0-dev diff --git a/exporters/otlp/BUILD b/exporters/otlp/BUILD index 60311dc949..749ba94564 100644 --- a/exporters/otlp/BUILD +++ b/exporters/otlp/BUILD @@ -209,6 +209,7 @@ cc_library( "//sdk/src/common:base64", "@com_github_opentelemetry_proto//:common_proto_cc", "@com_google_absl//absl/strings", + "@com_google_protobuf//:protobuf", "@github_nlohmann_json//:json", ], ) @@ -695,6 +696,7 @@ cc_test( ":otlp_grpc_exporter", "//api", "//sdk/src/metrics", + "//test_common:headers", "@com_google_googletest//:gtest_main", ], ) @@ -876,6 +878,7 @@ cc_test( "//api", "//sdk/src/logs", "//sdk/src/metrics", + "//test_common:headers", "@com_google_googletest//:gtest_main", ], ) @@ -909,6 +912,7 @@ cc_test( ":otlp_grpc_metric_exporter", "//api", "//sdk/src/metrics", + "//test_common:headers", "@com_google_googletest//:gtest_main", ], ) diff --git a/exporters/otlp/CMakeLists.txt b/exporters/otlp/CMakeLists.txt index 2d53476be6..1a5540d37a 100644 --- a/exporters/otlp/CMakeLists.txt +++ b/exporters/otlp/CMakeLists.txt @@ -907,7 +907,7 @@ if(BUILD_TESTING) add_executable(otlp_grpc_exporter_test test/otlp_grpc_exporter_test.cc) target_link_libraries( otlp_grpc_exporter_test ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} - ${GMOCK_LIB} opentelemetry_exporter_otlp_grpc) + ${GMOCK_LIB} opentelemetry_exporter_otlp_grpc opentelemetry_test_common) gtest_add_tests( TARGET otlp_grpc_exporter_test TEST_PREFIX exporter.otlp. @@ -942,7 +942,8 @@ if(BUILD_TESTING) opentelemetry_exporter_otlp_grpc opentelemetry_exporter_otlp_grpc_log opentelemetry_trace - opentelemetry_logs) + opentelemetry_logs + opentelemetry_test_common) gtest_add_tests( TARGET otlp_grpc_log_record_exporter_test TEST_PREFIX exporter.otlp. @@ -962,9 +963,13 @@ if(BUILD_TESTING) add_executable(otlp_grpc_metric_exporter_test test/otlp_grpc_metric_exporter_test.cc) target_link_libraries( - otlp_grpc_metric_exporter_test ${GTEST_BOTH_LIBRARIES} - ${CMAKE_THREAD_LIBS_INIT} ${GMOCK_LIB} opentelemetry_exporter_otlp_grpc - opentelemetry_exporter_otlp_grpc_metrics) + otlp_grpc_metric_exporter_test + ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} + ${GMOCK_LIB} + opentelemetry_exporter_otlp_grpc + opentelemetry_exporter_otlp_grpc_metrics + opentelemetry_test_common) gtest_add_tests( TARGET otlp_grpc_metric_exporter_test TEST_PREFIX exporter.otlp. @@ -992,7 +997,8 @@ if(BUILD_TESTING) opentelemetry_exporter_otlp_http opentelemetry_http_client_nosend nlohmann_json::nlohmann_json - protobuf::libprotobuf) + protobuf::libprotobuf + opentelemetry_test_common) gtest_add_tests( TARGET otlp_http_exporter_test TEST_PREFIX exporter.otlp. @@ -1018,7 +1024,8 @@ if(BUILD_TESTING) opentelemetry_exporter_otlp_http_log opentelemetry_logs opentelemetry_http_client_nosend - nlohmann_json::nlohmann_json) + nlohmann_json::nlohmann_json + opentelemetry_test_common) gtest_add_tests( TARGET otlp_http_log_record_exporter_test TEST_PREFIX exporter.otlp. @@ -1046,7 +1053,8 @@ if(BUILD_TESTING) opentelemetry_metrics opentelemetry_http_client_nosend nlohmann_json::nlohmann_json - protobuf::libprotobuf) + protobuf::libprotobuf + opentelemetry_test_common) gtest_add_tests( TARGET otlp_http_metric_exporter_test TEST_PREFIX exporter.otlp. diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h index 6185879277..5ce6e1e217 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_grpc_client.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -22,8 +23,6 @@ // clang-format on #ifdef ENABLE_ASYNC_EXPORT -# include - # include "opentelemetry/sdk/common/exporter_utils.h" #endif /* ENABLE_ASYNC_EXPORT */ @@ -151,21 +150,30 @@ class OtlpGrpcClient std::unique_ptr &&context, std::unique_ptr &&arena, proto::collector::trace::v1::ExportTraceServiceRequest *request, - proto::collector::trace::v1::ExportTraceServiceResponse *response); + proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function &&, + proto::collector::trace::v1::ExportTraceServiceResponse *)> &&on_complete = + {}); static grpc::Status DelegateExport( proto::collector::metrics::v1::MetricsService::StubInterface *stub, std::unique_ptr &&context, std::unique_ptr &&arena, proto::collector::metrics::v1::ExportMetricsServiceRequest *request, - proto::collector::metrics::v1::ExportMetricsServiceResponse *response); + proto::collector::metrics::v1::ExportMetricsServiceResponse *response, + std::function &&, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&on_complete = {}); static grpc::Status DelegateExport( proto::collector::logs::v1::LogsService::StubInterface *stub, std::unique_ptr &&context, std::unique_ptr &&arena, proto::collector::logs::v1::ExportLogsServiceRequest *request, - proto::collector::logs::v1::ExportLogsServiceResponse *response); + proto::collector::logs::v1::ExportLogsServiceResponse *response, + std::function &&, + proto::collector::logs::v1::ExportLogsServiceResponse *)> &&on_complete = + {}); void AddReference(OtlpGrpcClientReferenceGuard &guard, const OtlpGrpcClientOptions &options) noexcept; diff --git a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h index 15197cf4d6..cffb371565 100644 --- a/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h +++ b/exporters/otlp/include/opentelemetry/exporters/otlp/otlp_http_client.h @@ -23,13 +23,14 @@ #include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/version.h" -// forward declare google::protobuf::Message +// forward declare google::protobuf::Message and google::protobuf::Arena namespace google { namespace protobuf { +class Arena; class Message; -} +} // namespace protobuf } // namespace google OPENTELEMETRY_BEGIN_NAMESPACE @@ -204,6 +205,24 @@ class OtlpHttpClient std::function &&result_callback, std::size_t max_running_requests) noexcept; + /** + * Async export with typed response + * @param message message to export, it should be ExportTraceServiceRequest, + * ExportMetricsServiceRequest or ExportLogsServiceRequest + * @param arena protobuf arena that owns response + * @param response the parsed body is written here on 2xx + * @param result_callback callback to call when the exporting is done + * @param max_running_requests wait for at most max_running_requests running requests + * @return return the status of this operation + */ + sdk::common::ExportResult Export( + const google::protobuf::Message &message, + std::unique_ptr &&arena, + google::protobuf::Message *response, + std::function + &&result_callback, + std::size_t max_running_requests) noexcept; + /** * Force flush the HTTP client. */ @@ -242,13 +261,20 @@ class OtlpHttpClient std::shared_ptr session; std::shared_ptr event_handle; - HttpSessionData() = default; + std::unique_ptr arena; + google::protobuf::Message *response = nullptr; + + HttpSessionData() noexcept; + HttpSessionData(std::shared_ptr &&input_session, + std::shared_ptr &&input_handle, + std::unique_ptr &&input_arena, + google::protobuf::Message *input_response) noexcept; - explicit HttpSessionData( - std::shared_ptr &&input_session, - std::shared_ptr &&input_handle) - : session(std::move(input_session)), event_handle(std::move(input_handle)) - {} + ~HttpSessionData(); + HttpSessionData(HttpSessionData &&) noexcept; + HttpSessionData &operator=(HttpSessionData &&) noexcept; + HttpSessionData(const HttpSessionData &) = delete; + HttpSessionData &operator=(const HttpSessionData &) = delete; }; /** @@ -260,6 +286,21 @@ class OtlpHttpClient const google::protobuf::Message &message, std::function &&result_callback) noexcept; + /** + * @brief Create a Session object that deserializes the response body or return an error result. + * + * @param message The message to send + * @param arena Protobuf arena that owns response + * @param response the parsed body is written here on 2xx + * @param result_callback Callback for the export result; receives the populated response + */ + nostd::variant createSession( + const google::protobuf::Message &message, + std::unique_ptr &&arena, + google::protobuf::Message *response, + std::function + &&result_callback) noexcept; + /** * Add http session and hold it's lifetime. * @param session_data the session to add diff --git a/exporters/otlp/src/otlp_grpc_client.cc b/exporters/otlp/src/otlp_grpc_client.cc index 239190adc7..37e64feac8 100644 --- a/exporters/otlp/src/otlp_grpc_client.cc +++ b/exporters/otlp/src/otlp_grpc_client.cc @@ -515,11 +515,19 @@ grpc::Status OtlpGrpcClient::DelegateExport( std::unique_ptr &&context, std::unique_ptr &&arena, proto::collector::trace::v1::ExportTraceServiceRequest *request, - proto::collector::trace::v1::ExportTraceServiceResponse *response) + proto::collector::trace::v1::ExportTraceServiceResponse *response, + std::function &&, + proto::collector::trace::v1::ExportTraceServiceResponse *)> &&on_complete) { auto trace_grpc_context = std::move(context); auto trace_arena = std::move(arena); - return stub->Export(trace_grpc_context.get(), *request, response); + auto status = stub->Export(trace_grpc_context.get(), *request, response); + if (status.ok() && on_complete) + { + auto callback = std::move(on_complete); + callback(std::move(trace_arena), response); + } + return status; } grpc::Status OtlpGrpcClient::DelegateExport( @@ -527,11 +535,20 @@ grpc::Status OtlpGrpcClient::DelegateExport( std::unique_ptr &&context, std::unique_ptr &&arena, proto::collector::metrics::v1::ExportMetricsServiceRequest *request, - proto::collector::metrics::v1::ExportMetricsServiceResponse *response) + proto::collector::metrics::v1::ExportMetricsServiceResponse *response, + std::function &&, + proto::collector::metrics::v1::ExportMetricsServiceResponse *)> + &&on_complete) { auto metrics_grpc_context = std::move(context); auto metrics_arena = std::move(arena); - return stub->Export(metrics_grpc_context.get(), *request, response); + auto status = stub->Export(metrics_grpc_context.get(), *request, response); + if (status.ok() && on_complete) + { + auto callback = std::move(on_complete); + callback(std::move(metrics_arena), response); + } + return status; } grpc::Status OtlpGrpcClient::DelegateExport( @@ -539,11 +556,19 @@ grpc::Status OtlpGrpcClient::DelegateExport( std::unique_ptr &&context, std::unique_ptr &&arena, proto::collector::logs::v1::ExportLogsServiceRequest *request, - proto::collector::logs::v1::ExportLogsServiceResponse *response) + proto::collector::logs::v1::ExportLogsServiceResponse *response, + std::function &&, + proto::collector::logs::v1::ExportLogsServiceResponse *)> &&on_complete) { auto logs_grpc_context = std::move(context); auto logs_arena = std::move(arena); - return stub->Export(logs_grpc_context.get(), *request, response); + auto status = stub->Export(logs_grpc_context.get(), *request, response); + if (status.ok() && on_complete) + { + auto callback = std::move(on_complete); + callback(std::move(logs_arena), response); + } + return status; } void OtlpGrpcClient::AddReference(OtlpGrpcClientReferenceGuard &guard, diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index ffeb85993f..f561761b3a 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -32,10 +33,6 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on -#ifdef ENABLE_ASYNC_EXPORT -# include -#endif - OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter { @@ -151,7 +148,7 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( opentelemetry::sdk::common::ExportResult result, std::unique_ptr &&arena, const proto::collector::trace::v1::ExportTraceServiceRequest &request, - proto::collector::trace::v1::ExportTraceServiceResponse *) { + proto::collector::trace::v1::ExportTraceServiceResponse *response) { auto trace_arena = std::move(arena); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { @@ -159,6 +156,15 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( << request.resource_spans_size() << " trace span(s) error: " << static_cast(result)); } + else if (response->has_partial_success() && + (response->partial_success().rejected_spans() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export partial success: " + << partial.rejected_spans() << " span(s) rejected: \"" + << partial.error_message() << "\""); + } else { OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export " @@ -175,7 +181,25 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( google::protobuf::Arena::Create( arena.get()); grpc::Status status = OtlpGrpcClient::DelegateExport( - trace_service_stub_.get(), std::move(context), std::move(arena), request, response); + trace_service_stub_.get(), std::move(context), std::move(arena), request, response, + [resource_spans_size](std::unique_ptr &&arena, + proto::collector::trace::v1::ExportTraceServiceResponse *response) { + auto trace_arena = std::move(arena); + if (response->has_partial_success() && + (response->partial_success().rejected_spans() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export partial success: " + << partial.rejected_spans() << " span(s) rejected: \"" + << partial.error_message() << "\""); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export " + << resource_spans_size << " trace span(s) success"); + } + }); if (!status.ok()) { OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE GRPC Exporter] Export() failed with status_code: \"" @@ -183,11 +207,6 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( << "\" error_message: \"" << status.error_message() << "\""); return sdk::common::ExportResult::kFailure; } - else - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE GRPC Exporter] Export " << resource_spans_size - << " trace span(s) success"); - } #ifdef ENABLE_ASYNC_EXPORT } #endif diff --git a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc index cae08a22eb..6490c0fd7a 100644 --- a/exporters/otlp/src/otlp_grpc_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_record_exporter.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -31,10 +32,6 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on -#ifdef ENABLE_ASYNC_EXPORT -# include -#endif - OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter { @@ -153,7 +150,7 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( opentelemetry::sdk::common::ExportResult result, std::unique_ptr &&arena, const proto::collector::logs::v1::ExportLogsServiceRequest &request, - proto::collector::logs::v1::ExportLogsServiceResponse *) { + proto::collector::logs::v1::ExportLogsServiceResponse *response) { auto logs_arena = std::move(arena); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { @@ -161,6 +158,16 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( << request.resource_logs_size() << " log(s) error: " << static_cast(result)); } + else if (response->has_partial_success() && + (response->partial_success().rejected_log_records() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG GRPC Exporter] Export partial success: " + << partial.rejected_log_records() + << " log record(s) rejected: \"" << partial.error_message() + << "\""); + } else { OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG GRPC Exporter] Export " @@ -176,7 +183,21 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogRecordExporter::Export( google::protobuf::Arena::Create( arena.get()); grpc::Status status = OtlpGrpcClient::DelegateExport( - log_service_stub_.get(), std::move(context), std::move(arena), request, response); + log_service_stub_.get(), std::move(context), std::move(arena), request, response, + [](std::unique_ptr &&arena, + proto::collector::logs::v1::ExportLogsServiceResponse *response) { + auto logs_arena = std::move(arena); + if (response->has_partial_success() && + (response->partial_success().rejected_log_records() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG GRPC Exporter] Export partial success: " + << partial.rejected_log_records() + << " log record(s) rejected: \"" << partial.error_message() + << "\""); + } + }); if (!status.ok()) { diff --git a/exporters/otlp/src/otlp_grpc_metric_exporter.cc b/exporters/otlp/src/otlp_grpc_metric_exporter.cc index 669613a787..9446157ec1 100644 --- a/exporters/otlp/src/otlp_grpc_metric_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_metric_exporter.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -30,10 +31,6 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on -#ifdef ENABLE_ASYNC_EXPORT -# include -#endif - OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter { @@ -161,7 +158,7 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( opentelemetry::sdk::common::ExportResult result, std::unique_ptr &&arena, const proto::collector::metrics::v1::ExportMetricsServiceRequest &request, - proto::collector::metrics::v1::ExportMetricsServiceResponse *) { + proto::collector::metrics::v1::ExportMetricsServiceResponse *response) { auto metrics_arena = std::move(arena); if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { @@ -169,6 +166,16 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( << request.resource_metrics_size() << " metric(s) error: " << static_cast(result)); } + else if (response->has_partial_success() && + (response->partial_success().rejected_data_points() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC GRPC Exporter] Export partial success: " + << partial.rejected_data_points() + << " data point(s) rejected: \"" << partial.error_message() + << "\""); + } else { OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC GRPC Exporter] Export " @@ -185,7 +192,21 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcMetricExporter::Export( proto::collector::metrics::v1::ExportMetricsServiceResponse>(arena.get()); grpc::Status status = OtlpGrpcClient::DelegateExport( - metrics_service_stub_.get(), std::move(context), std::move(arena), request, response); + metrics_service_stub_.get(), std::move(context), std::move(arena), request, response, + [](std::unique_ptr &&arena, + proto::collector::metrics::v1::ExportMetricsServiceResponse *response) { + auto metrics_arena = std::move(arena); + if (response->has_partial_success() && + (response->partial_success().rejected_data_points() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC GRPC Exporter] Export partial success: " + << partial.rejected_data_points() + << " data point(s) rejected: \"" << partial.error_message() + << "\""); + } + }); if (!status.ok()) { diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 8542c86de2..703cca5bde 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -40,9 +40,13 @@ // clang-format off #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h" // IWYU pragma: keep // clang-format on +#include #include #include #include +#include +// IWYU pragma: no_include +// IWYU pragma: no_include // clang-format off #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on @@ -69,11 +73,19 @@ class ResponseHandler : public http_client::EventHandler { public: /** - * Creates a response handler, that by default doesn't display to console + * Creates a response handler, that by default doesn't display to console. + * Deserializes the response body into the caller-provided typed message on + * successful 2xx responses. */ - ResponseHandler(std::function &&callback, + ResponseHandler(std::function &&callback, + google::protobuf::Message *response, + HttpRequestContentType content_type, bool console_debug = false) - : result_callback_{std::move(callback)}, console_debug_{console_debug} + : result_callback_{std::move(callback)}, + response_{response}, + content_type_{content_type}, + console_debug_{console_debug} {} std::string BuildResponseLogMessage(http_client::Response &response, @@ -123,6 +135,24 @@ class ResponseHandler : public http_client::EventHandler } } + // On 2xx with a non-empty body, parse it into the caller-provided typed response + if (response_ != nullptr && result == sdk::common::ExportResult::kSuccess && !body_.empty()) + { + if (content_type_ == HttpRequestContentType::kJson) + { + if (!google::protobuf::util::JsonStringToMessage(body_, response_).ok()) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Failed to parse JSON response body"); + result = sdk::common::ExportResult::kFailure; + } + } + else if (!response_->ParseFromString(body_)) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Failed to parse response body"); + result = sdk::common::ExportResult::kFailure; + } + } + { bool expected = false; if (stopping_.compare_exchange_strong(expected, true, std::memory_order_release)) @@ -321,19 +351,23 @@ class ResponseHandler : public http_client::EventHandler // first. OtlpHttpClient *owner = owner_; const opentelemetry::ext::http::client::Session *session = session_; + auto callback = std::move(result_callback_); + google::protobuf::Message *response = response_; - owner_ = nullptr; - session_ = nullptr; + owner_ = nullptr; + session_ = nullptr; + response_ = nullptr; + + // Run the callback before releasing the session, so the arena-owned response stays alive. + if (callback) + { + callback(result, response); + } if (nullptr != owner && nullptr != session) { // Release the session at last owner->ReleaseSession(*session); - - if (result_callback_) - { - result_callback_(result); - } } } @@ -359,7 +393,14 @@ class ResponseHandler : public http_client::EventHandler std::string body_ = ""; // Result callback when in async mode - std::function result_callback_; + std::function + result_callback_; + + // Optional typed response, filled on 2xx + google::protobuf::Message *response_ = nullptr; + + // Request and response body encoding + HttpRequestContentType content_type_; // Whether to print the results from the callback bool console_debug_ = false; @@ -748,13 +789,28 @@ sdk::common::ExportResult OtlpHttpClient::Export( std::function &&result_callback, std::size_t max_running_requests) noexcept { - auto session = createSession(message, std::move(result_callback)); - if (opentelemetry::nostd::holds_alternative(session)) + auto adapted = [cb = std::move(result_callback)](opentelemetry::sdk::common::ExportResult result, + google::protobuf::Message * /*response*/) { + return cb(result); + }; + return Export(message, nullptr, nullptr, std::move(adapted), max_running_requests); +} + +sdk::common::ExportResult OtlpHttpClient::Export( + const google::protobuf::Message &message, + std::unique_ptr &&arena, + google::protobuf::Message *response, + std::function + &&result_callback, + std::size_t max_running_requests) noexcept +{ + auto session = createSession(message, std::move(arena), response, std::move(result_callback)); + if (auto *result = opentelemetry::nostd::get_if(&session)) { - return opentelemetry::nostd::get(session); + return *result; } - addSession(std::move(opentelemetry::nostd::get(session))); + addSession(std::move(*opentelemetry::nostd::get_if(&session))); // Wait for the response to be received if (options_.console_debug) @@ -884,6 +940,22 @@ opentelemetry::nostd::variant &&result_callback) noexcept +{ + auto adapted = [cb = std::move(result_callback)](opentelemetry::sdk::common::ExportResult result, + google::protobuf::Message * /*response*/) { + return cb(result); + }; + return createSession(message, nullptr, nullptr, std::move(adapted)); +} + +opentelemetry::nostd::variant +OtlpHttpClient::createSession( + const google::protobuf::Message &message, + std::unique_ptr &&arena, + google::protobuf::Message *response, + std::function + &&result_callback) noexcept { // Parse uri and store it to cache if (http_uri_.empty()) @@ -899,7 +971,7 @@ OtlpHttpClient::createSession( OTEL_INTERNAL_LOG_ERROR(error_message); const auto result = opentelemetry::sdk::common::ExportResult::kFailure; - result_callback(result); + result_callback(result, response); return result; } @@ -934,7 +1006,7 @@ OtlpHttpClient::createSession( } const auto result = opentelemetry::sdk::common::ExportResult::kFailure; - result_callback(result); + result_callback(result, response); return result; } content_type = kHttpBinaryContentType; @@ -969,7 +1041,7 @@ OtlpHttpClient::createSession( OTEL_INTERNAL_LOG_ERROR(error_message); const auto result = opentelemetry::sdk::common::ExportResult::kFailure; - result_callback(result); + result_callback(result, response); return result; } @@ -996,13 +1068,31 @@ OtlpHttpClient::createSession( request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip); } - // Returns the created session data return HttpSessionData{ std::move(session), - std::shared_ptr{ - new ResponseHandler(std::move(result_callback), options_.console_debug)}}; + std::shared_ptr{new ResponseHandler( + std::move(result_callback), response, options_.content_type, options_.console_debug)}, + std::move(arena), response}; } +OtlpHttpClient::HttpSessionData::HttpSessionData() noexcept = default; + +OtlpHttpClient::HttpSessionData::HttpSessionData( + std::shared_ptr &&input_session, + std::shared_ptr &&input_handle, + std::unique_ptr &&input_arena, + google::protobuf::Message *input_response) noexcept + : session(std::move(input_session)), + event_handle(std::move(input_handle)), + arena(std::move(input_arena)), + response(input_response) +{} + +OtlpHttpClient::HttpSessionData::~HttpSessionData() = default; +OtlpHttpClient::HttpSessionData::HttpSessionData(HttpSessionData &&) noexcept = default; +OtlpHttpClient::HttpSessionData &OtlpHttpClient::HttpSessionData::operator=( + HttpSessionData &&) noexcept = default; + void OtlpHttpClient::addSession(HttpSessionData &&session_data) noexcept { if (!session_data.session || !session_data.event_handle) diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index 499d577185..e50fe8fa79 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -31,9 +32,13 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on -#ifdef ENABLE_ASYNC_EXPORT -# include -#endif +namespace google +{ +namespace protobuf +{ +class Message; +} // namespace protobuf +} // namespace google OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -168,42 +173,52 @@ opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + // Ownership transfers into HttpSessionData until the request completes + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::trace::v1::ExportTraceServiceRequest *service_request = google::protobuf::Arena::Create( - &arena); + arena.get()); OtlpRecordableUtils::PopulateRequest(spans, service_request); std::size_t span_count = spans.size(); + + proto::collector::trace::v1::ExportTraceServiceResponse *response = + google::protobuf::Arena::Create( + arena.get()); + + auto handle_result = [span_count](opentelemetry::sdk::common::ExportResult result, + google::protobuf::Message *response_msg) { + if (result != opentelemetry::sdk::common::ExportResult::kSuccess) + { + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " + << span_count + << " trace span(s) error: " << static_cast(result)); + return true; + } + auto *response = + static_cast(response_msg); + if (response->has_partial_success() && (response->partial_success().rejected_spans() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] Export partial success: " + << partial.rejected_spans() << " span(s) rejected: \"" + << partial.error_message() << "\""); + } + else + { + OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE HTTP Exporter] Export " << span_count + << " trace span(s) success"); + } + return true; + }; + #ifdef ENABLE_ASYNC_EXPORT - http_client_->Export( - *service_request, [span_count](opentelemetry::sdk::common::ExportResult result) { - if (result != opentelemetry::sdk::common::ExportResult::kSuccess) - { - OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " - << span_count - << " trace span(s) error: " << static_cast(result)); - } - else - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE HTTP Exporter] Export " << span_count - << " trace span(s) success"); - } - return true; - }); + http_client_->Export(*service_request, std::move(arena), response, std::move(handle_result), + options_.max_concurrent_requests); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); - if (result != opentelemetry::sdk::common::ExportResult::kSuccess) - { - OTEL_INTERNAL_LOG_ERROR("[OTLP TRACE HTTP Exporter] ERROR: Export " - << span_count << " trace span(s) error: " << static_cast(result)); - } - else - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP TRACE HTTP Exporter] Export " << span_count - << " trace span(s) success"); - } + http_client_->Export(*service_request, std::move(arena), response, std::move(handle_result), 0); return opentelemetry::sdk::common::ExportResult::kSuccess; #endif } diff --git a/exporters/otlp/src/otlp_http_log_record_exporter.cc b/exporters/otlp/src/otlp_http_log_record_exporter.cc index b787ae3a46..c714a2e896 100644 --- a/exporters/otlp/src/otlp_http_log_record_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_record_exporter.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -31,9 +32,13 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on -#ifdef ENABLE_ASYNC_EXPORT -# include -#endif +namespace google +{ +namespace protobuf +{ +class Message; +} // namespace protobuf +} // namespace google OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -171,38 +176,51 @@ opentelemetry::sdk::common::ExportResult OtlpHttpLogRecordExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + // Ownership transfers into HttpSessionData until the request completes + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::logs::v1::ExportLogsServiceRequest *service_request = - google::protobuf::Arena::Create(&arena); + google::protobuf::Arena::Create( + arena.get()); OtlpRecordableUtils::PopulateRequest(logs, service_request); std::size_t log_count = logs.size(); -#ifdef ENABLE_ASYNC_EXPORT - http_client_->Export(*service_request, [log_count]( - opentelemetry::sdk::common::ExportResult result) { + + proto::collector::logs::v1::ExportLogsServiceResponse *response = + google::protobuf::Arena::Create( + arena.get()); + + auto handle_result = [log_count](opentelemetry::sdk::common::ExportResult result, + google::protobuf::Message *response_msg) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " << log_count << " log(s) error: " << static_cast(result)); + return true; + } + auto *response = + static_cast(response_msg); + if (response->has_partial_success() && + (response->partial_success().rejected_log_records() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] Export partial success: " + << partial.rejected_log_records() << " log record(s) rejected: \"" + << partial.error_message() << "\""); } else { OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG HTTP Exporter] Export " << log_count << " log(s) success"); } return true; - }); + }; + +#ifdef ENABLE_ASYNC_EXPORT + http_client_->Export(*service_request, std::move(arena), response, std::move(handle_result), + options_.max_concurrent_requests); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); - if (result != opentelemetry::sdk::common::ExportResult::kSuccess) - { - OTEL_INTERNAL_LOG_ERROR("[OTLP LOG HTTP Exporter] ERROR: Export " - << log_count << " log(s) error: " << static_cast(result)); - } - else - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP LOG HTTP Exporter] Export " << log_count << " log(s) success"); - } + http_client_->Export(*service_request, std::move(arena), response, std::move(handle_result), 0); return opentelemetry::sdk::common::ExportResult::kSuccess; #endif } diff --git a/exporters/otlp/src/otlp_http_metric_exporter.cc b/exporters/otlp/src/otlp_http_metric_exporter.cc index 38f5a8eb93..b261625604 100644 --- a/exporters/otlp/src/otlp_http_metric_exporter.cc +++ b/exporters/otlp/src/otlp_http_metric_exporter.cc @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -30,9 +31,13 @@ #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on -#ifdef ENABLE_ASYNC_EXPORT -# include -#endif +namespace google +{ +namespace protobuf +{ +class Message; +} // namespace protobuf +} // namespace google OPENTELEMETRY_BEGIN_NAMESPACE namespace exporter @@ -176,20 +181,37 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( // When in batch mode, it's easy to export a large number of spans at once, we can alloc a lager // block to reduce memory fragments. arena_options.max_block_size = 65536; - google::protobuf::Arena arena{arena_options}; + // Ownership transfers into HttpSessionData until the request completes + std::unique_ptr arena{new google::protobuf::Arena{arena_options}}; proto::collector::metrics::v1::ExportMetricsServiceRequest *service_request = google::protobuf::Arena::Create( - &arena); + arena.get()); OtlpMetricUtils::PopulateRequest(data, service_request); std::size_t metric_count = data.scope_metric_data_.size(); -#ifdef ENABLE_ASYNC_EXPORT - http_client_->Export(*service_request, [metric_count]( - opentelemetry::sdk::common::ExportResult result) { + + proto::collector::metrics::v1::ExportMetricsServiceResponse *response = + google::protobuf::Arena::Create( + arena.get()); + + auto handle_result = [metric_count](opentelemetry::sdk::common::ExportResult result, + google::protobuf::Message *response_msg) { if (result != opentelemetry::sdk::common::ExportResult::kSuccess) { OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " << metric_count << " metric(s) error: " << static_cast(result)); + return true; + } + auto *response = + static_cast(response_msg); + if (response->has_partial_success() && + (response->partial_success().rejected_data_points() != 0 || + !response->partial_success().error_message().empty())) + { + const auto &partial = response->partial_success(); + OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] Export partial success: " + << partial.rejected_data_points() << " data point(s) rejected: \"" + << partial.error_message() << "\""); } else { @@ -197,20 +219,14 @@ opentelemetry::sdk::common::ExportResult OtlpHttpMetricExporter::Export( << " metric(s) success"); } return true; - }); + }; + +#ifdef ENABLE_ASYNC_EXPORT + http_client_->Export(*service_request, std::move(arena), response, std::move(handle_result), + options_.max_concurrent_requests); return opentelemetry::sdk::common::ExportResult::kSuccess; #else - opentelemetry::sdk::common::ExportResult result = http_client_->Export(*service_request); - if (result != opentelemetry::sdk::common::ExportResult::kSuccess) - { - OTEL_INTERNAL_LOG_ERROR("[OTLP METRIC HTTP Exporter] ERROR: Export " - << metric_count << " metric(s) error: " << static_cast(result)); - } - else - { - OTEL_INTERNAL_LOG_DEBUG("[OTLP METRIC HTTP Exporter] Export " << metric_count - << " metric(s) success"); - } + http_client_->Export(*service_request, std::move(arena), response, std::move(handle_result), 0); return opentelemetry::sdk::common::ExportResult::kSuccess; #endif } diff --git a/exporters/otlp/test/otlp_grpc_exporter_test.cc b/exporters/otlp/test/otlp_grpc_exporter_test.cc index 3ac174b691..46a749913a 100644 --- a/exporters/otlp/test/otlp_grpc_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_exporter_test.cc @@ -28,16 +28,21 @@ # include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" # include "opentelemetry/nostd/shared_ptr.h" +# include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk/trace/simple_processor.h" # include "opentelemetry/sdk/trace/simple_processor_factory.h" # include "opentelemetry/sdk/trace/tracer_provider.h" # include "opentelemetry/sdk/trace/tracer_provider_factory.h" +# include "opentelemetry/test_common/sdk/common/scoped_test_log_handler.h" # include "opentelemetry/trace/provider.h" # include "opentelemetry/trace/tracer_provider.h" # include # include +# include # include +# include +# include # if defined(_MSC_VER) # include "opentelemetry/sdk/common/env_variables.h" @@ -117,6 +122,8 @@ class OtlpMockTraceServiceStub : public proto::collector::trace::v1::MockTraceSe ::grpc::Status last_async_status_; async_interface async_interface_; }; + +using opentelemetry::test_common::ScopedTestLogHandler; } // namespace class OtlpGrpcExporterTestPeer : public ::testing::Test @@ -196,6 +203,47 @@ TEST_F(OtlpGrpcExporterTestPeer, ExportUnitTest) # endif } +// Exporter logs the rejection on partial_success. +TEST_F(OtlpGrpcExporterTestPeer, ExportPartialSuccess) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + auto mock_stub = new OtlpMockTraceServiceStub(); + std::unique_ptr stub_interface( + mock_stub); + auto exporter = GetExporter(stub_interface); + + auto recordable = exporter->MakeRecordable(); + recordable->SetName("Test span"); + + nostd::span> batch(&recordable, 1); + EXPECT_CALL(*mock_stub, Export(_, _, _)) + .Times(Exactly(1)) + .WillOnce(Invoke( + [](::grpc::ClientContext *, + const proto::collector::trace::v1::ExportTraceServiceRequest &, + proto::collector::trace::v1::ExportTraceServiceResponse *response) -> ::grpc::Status { + response->mutable_partial_success()->set_rejected_spans(21); + response->mutable_partial_success()->set_error_message("too many spans!!"); + return ::grpc::Status::OK; + })); + + auto result = exporter->Export(batch); + exporter->ForceFlush(); + + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 span(s) rejected")); + EXPECT_TRUE(contains("too many spans!!")); +} + // Create spans, let processor call Export() TEST_F(OtlpGrpcExporterTestPeer, ExportIntegrationTest) { diff --git a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc index 4250f86eee..2d9cd4226c 100644 --- a/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_log_record_exporter_test.cc @@ -6,11 +6,13 @@ #include #include #include +#include #include #include #include #include #include +#include #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -27,6 +29,7 @@ #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/nostd/variant.h" #include "opentelemetry/sdk/common/exporter_utils.h" +#include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk/logs/batch_log_record_processor.h" #include "opentelemetry/sdk/logs/exporter.h" #include "opentelemetry/sdk/logs/logger_provider.h" @@ -38,6 +41,7 @@ #include "opentelemetry/sdk/trace/simple_processor_factory.h" #include "opentelemetry/sdk/trace/tracer_provider.h" #include "opentelemetry/sdk/trace/tracer_provider_factory.h" +#include "opentelemetry/test_common/sdk/common/scoped_test_log_handler.h" #include "opentelemetry/trace/noop.h" #include "opentelemetry/trace/scope.h" #include "opentelemetry/trace/span.h" @@ -52,6 +56,7 @@ #include "opentelemetry/proto/collector/logs/v1/logs_service_mock.grpc.pb.h" #include "opentelemetry/proto/collector/trace/v1/trace_service_mock.grpc.pb.h" #include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h" +#include "opentelemetry/proto/collector/logs/v1/logs_service.pb.h" #include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h" #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h" // IWYU pragma: keep // clang-format on @@ -193,6 +198,8 @@ class OtlpMockLogsServiceStub : public proto::collector::logs::v1::MockLogsServi ::grpc::Status last_async_status_; async_interface async_interface_; }; + +using opentelemetry::test_common::ScopedTestLogHandler; } // namespace class OtlpGrpcLogRecordExporterTestPeer : public ::testing::Test @@ -536,6 +543,44 @@ TEST_F(OtlpGrpcLogRecordExporterTestPeer, ConfigRetryGenericValuesFromEnv) } #endif // NO_GETENV +// Exporter logs the rejection on partial_success. +TEST_F(OtlpGrpcLogRecordExporterTestPeer, ExportPartialSuccess) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + auto mock_stub = new OtlpMockLogsServiceStub(); + std::unique_ptr stub_interface(mock_stub); + auto exporter = GetExporter(stub_interface); + + auto recordable = exporter->MakeRecordable(); + + nostd::span> batch(&recordable, 1); + EXPECT_CALL(*mock_stub, Export(_, _, _)) + .Times(Exactly(1)) + .WillOnce(Invoke( + [](grpc::ClientContext *, const proto::collector::logs::v1::ExportLogsServiceRequest &, + proto::collector::logs::v1::ExportLogsServiceResponse *response) -> grpc::Status { + response->mutable_partial_success()->set_rejected_log_records(21); + response->mutable_partial_success()->set_error_message("too many logs!!"); + return grpc::Status::OK; + })); + + auto result = exporter->Export(batch); + exporter->ForceFlush(); + + EXPECT_EQ(sdk::common::ExportResult::kSuccess, result); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 log record(s) rejected")); + EXPECT_TRUE(contains("too many logs!!")); +} + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc index 098c2c85c0..4f9367d7fe 100644 --- a/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc +++ b/exporters/otlp/test/otlp_grpc_metric_exporter_test.cc @@ -28,12 +28,26 @@ # include "opentelemetry/exporters/otlp/otlp_grpc_client.h" # include "opentelemetry/exporters/otlp/otlp_grpc_client_factory.h" +# include "opentelemetry/common/timestamp.h" +# include "opentelemetry/nostd/shared_ptr.h" +# include "opentelemetry/sdk/common/global_log_handler.h" +# include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h" +# include "opentelemetry/sdk/metrics/data/metric_data.h" +# include "opentelemetry/sdk/metrics/data/point_data.h" +# include "opentelemetry/sdk/metrics/export/metric_producer.h" +# include "opentelemetry/sdk/metrics/instruments.h" +# include "opentelemetry/sdk/resource/resource.h" # include "opentelemetry/sdk/trace/simple_processor.h" # include "opentelemetry/sdk/trace/tracer_provider.h" +# include "opentelemetry/test_common/sdk/common/scoped_test_log_handler.h" # include "opentelemetry/trace/provider.h" # include # include +# include +# include +# include +# include # if defined(_MSC_VER) # include "opentelemetry/sdk/common/env_variables.h" @@ -49,6 +63,75 @@ namespace exporter namespace otlp { +namespace +{ +class OtlpMockMetricsServiceStub : public proto::collector::metrics::v1::MockMetricsServiceStub +{ +public: +// Some old toolchains can only use gRPC 1.33 and it's experimental. +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 + using async_interface_base = + proto::collector::metrics::v1::MetricsService::StubInterface::async_interface; +# else + using async_interface_base = + proto::collector::metrics::v1::MetricsService::StubInterface::experimental_async_interface; +# endif + + OtlpMockMetricsServiceStub() : async_interface_(this) {} + + class async_interface : public async_interface_base + { + public: + async_interface(OtlpMockMetricsServiceStub *owner) : stub_(owner) {} + + void Export( + ::grpc::ClientContext *context, + const ::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest *request, + ::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse *response, + std::function callback) override + { + stub_->last_async_status_ = stub_->Export(context, *request, response); + callback(stub_->last_async_status_); + } + +// Some old toolchains can only use gRPC 1.33 and it's experimental. +# if defined(GRPC_CPP_VERSION_MAJOR) && \ + (GRPC_CPP_VERSION_MAJOR * 1000 + GRPC_CPP_VERSION_MINOR) >= 1039 || \ + defined(GRPC_CALLBACK_API_NONEXPERIMENTAL) + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest + * /*request*/, + ::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse * /*response*/, + ::grpc::ClientUnaryReactor * /*reactor*/) override + {} +# else + void Export( + ::grpc::ClientContext * /*context*/, + const ::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest + * /*request*/, + ::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse * /*response*/, + ::grpc::experimental::ClientUnaryReactor * /*reactor*/) + {} +# endif + + private: + OtlpMockMetricsServiceStub *stub_; + }; + + async_interface_base *async() override { return &async_interface_; } + + ::grpc::Status GetLastAsyncStatus() const noexcept { return last_async_status_; } + +private: + ::grpc::Status last_async_status_; + async_interface async_interface_; +}; + +using opentelemetry::test_common::ScopedTestLogHandler; +} // namespace + class OtlpGrpcMetricExporterTestPeer : public ::testing::Test { public: @@ -300,6 +383,61 @@ TEST_F(OtlpGrpcMetricExporterTestPeer, CheckGetAggregationTemporality) exporter3->GetAggregationTemporality(opentelemetry::sdk::metrics::InstrumentType::kCounter)); } +// Exporter logs the rejection on partial_success. +TEST_F(OtlpGrpcMetricExporterTestPeer, ExportPartialSuccess) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + auto mock_stub = new OtlpMockMetricsServiceStub(); + std::unique_ptr stub_interface( + mock_stub); + auto exporter = GetExporter(std::move(stub_interface)); + + opentelemetry::sdk::metrics::SumPointData sum_point_data{}; + sum_point_data.value_ = 10.0; + opentelemetry::sdk::metrics::ResourceMetrics data; + auto resource = opentelemetry::sdk::resource::Resource::Create( + opentelemetry::sdk::resource::ResourceAttributes{}); + data.resource_ = &resource; + auto scope = opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create( + "library_name", "1.5.0"); + opentelemetry::sdk::metrics::MetricData metric_data{ + opentelemetry::sdk::metrics::InstrumentDescriptor{ + "metrics_name", "description", "unit", + opentelemetry::sdk::metrics::InstrumentType::kCounter, + opentelemetry::sdk::metrics::InstrumentValueType::kDouble}, + opentelemetry::sdk::metrics::AggregationTemporality::kDelta, + opentelemetry::common::SystemTimestamp{}, opentelemetry::common::SystemTimestamp{}, + std::vector{ + {opentelemetry::sdk::metrics::PointAttributes{}, sum_point_data}}}; + data.scope_metric_data_ = std::vector{ + {scope.get(), std::vector{metric_data}}}; + + EXPECT_CALL(*mock_stub, Export(_, _, _)) + .Times(Exactly(1)) + .WillOnce(Invoke([](grpc::ClientContext *, + const proto::collector::metrics::v1::ExportMetricsServiceRequest &, + proto::collector::metrics::v1::ExportMetricsServiceResponse *response) + -> grpc::Status { + response->mutable_partial_success()->set_rejected_data_points(21); + response->mutable_partial_success()->set_error_message("too many data points!!"); + return grpc::Status::OK; + })); + + EXPECT_EQ(opentelemetry::sdk::common::ExportResult::kSuccess, exporter->Export(data)); + exporter->ForceFlush(); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 data point(s) rejected")); + EXPECT_TRUE(contains("too many data points!!")); +} + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/test/otlp_http_exporter_test.cc b/exporters/otlp/test/otlp_http_exporter_test.cc index 3cbfa9d16c..cac072b48f 100644 --- a/exporters/otlp/test/otlp_http_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_exporter_test.cc @@ -17,6 +17,7 @@ # include "opentelemetry/ext/http/client/http_client_factory.h" # include "opentelemetry/ext/http/server/http_server.h" +# include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk/trace/batch_span_processor.h" # include "opentelemetry/sdk/trace/batch_span_processor_options.h" # include "opentelemetry/sdk/trace/simple_processor.h" @@ -25,9 +26,14 @@ # include "opentelemetry/sdk/trace/tracer_provider_factory.h" # include "opentelemetry/test_common/ext/http/client/http_client_test_factory.h" # include "opentelemetry/test_common/ext/http/client/nosend/http_client_nosend.h" +# include "opentelemetry/test_common/sdk/common/scoped_test_log_handler.h" # include "opentelemetry/trace/provider.h" # include "opentelemetry/trace/tracer_provider.h" +# include +# include +# include + # include # include # include "gmock/gmock.h" @@ -47,6 +53,20 @@ namespace exporter namespace otlp { +namespace +{ +class ProtobufGlobalSymbolGuard +{ +public: + ProtobufGlobalSymbolGuard() = default; + ~ProtobufGlobalSymbolGuard() { google::protobuf::ShutdownProtobufLibrary(); } + ProtobufGlobalSymbolGuard(const ProtobufGlobalSymbolGuard &) = delete; + ProtobufGlobalSymbolGuard &operator=(const ProtobufGlobalSymbolGuard &) = delete; + ProtobufGlobalSymbolGuard(ProtobufGlobalSymbolGuard &&) = delete; + ProtobufGlobalSymbolGuard &operator=(ProtobufGlobalSymbolGuard &&) = delete; +}; +} // namespace + namespace trace_api = opentelemetry::trace; namespace resource = opentelemetry::sdk::resource; @@ -564,7 +584,7 @@ TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestSync) TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestAsync) { ExportJsonIntegrationTestAsync(); - google::protobuf::ShutdownProtobufLibrary(); + static ProtobufGlobalSymbolGuard global_symbol_guard; } # endif @@ -858,6 +878,90 @@ TEST_P(OtlpHttpExporterRetryIntegrationTests, StatusCodes) } # endif // ENABLE_OTLP_RETRY_PREVIEW +using opentelemetry::test_common::ScopedTestLogHandler; + +// Exporter logs the rejection on partial_success. +TEST_F(OtlpHttpExporterTestPeer, ExportPartialSuccess) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + proto::collector::trace::v1::ExportTraceServiceResponse partial; + partial.mutable_partial_success()->set_rejected_spans(21); + partial.mutable_partial_success()->set_error_message("too many spans!!"); + std::string serialized = partial.SerializeAsString(); + + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto exporter = GetExporter(std::unique_ptr{mock_otlp_client.first}); + + auto no_send_client = + std::static_pointer_cast(mock_otlp_client.second); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&serialized](const std::shared_ptr &callback) { + http_client::nosend::Response response; + response.body_.assign(serialized.begin(), serialized.end()); + response.Finish(*callback.get()); + }); + + auto recordable = exporter->MakeRecordable(); + recordable->SetName("Test span"); + nostd::span> batch(&recordable, 1); + EXPECT_EQ(sdk::common::ExportResult::kSuccess, exporter->Export(batch)); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 span(s) rejected")); + EXPECT_TRUE(contains("too many spans!!")); +} + +// Exporter logs the rejection on partial_success when the response is JSON encoded. +TEST_F(OtlpHttpExporterTestPeer, ExportPartialSuccessJson) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + std::string serialized = + R"({"partialSuccess":{"rejectedSpans":"21","errorMessage":"too many spans!!"}})"; + + auto mock_otlp_client = + OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto exporter = GetExporter(std::unique_ptr{mock_otlp_client.first}); + + auto no_send_client = + std::static_pointer_cast(mock_otlp_client.second); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&serialized](const std::shared_ptr &callback) { + http_client::nosend::Response response; + response.body_.assign(serialized.begin(), serialized.end()); + response.Finish(*callback.get()); + }); + + auto recordable = exporter->MakeRecordable(); + recordable->SetName("Test span"); + nostd::span> batch(&recordable, 1); + EXPECT_EQ(sdk::common::ExportResult::kSuccess, exporter->Export(batch)); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 span(s) rejected")); + EXPECT_TRUE(contains("too many spans!!")); +} + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/test/otlp_http_log_record_exporter_test.cc b/exporters/otlp/test/otlp_http_log_record_exporter_test.cc index 3fca7a06ea..e28c0da02b 100644 --- a/exporters/otlp/test/otlp_http_log_record_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_record_exporter_test.cc @@ -3,8 +3,11 @@ #ifndef OPENTELEMETRY_STL_VERSION +# include # include # include +# include +# include # include "opentelemetry/exporters/otlp/otlp_http_log_record_exporter.h" @@ -18,12 +21,15 @@ # include "opentelemetry/ext/http/client/http_client_factory.h" # include "opentelemetry/ext/http/server/http_server.h" # include "opentelemetry/logs/provider.h" +# include "opentelemetry/nostd/shared_ptr.h" +# include "opentelemetry/sdk/common/global_log_handler.h" # include "opentelemetry/sdk/logs/batch_log_record_processor.h" # include "opentelemetry/sdk/logs/exporter.h" # include "opentelemetry/sdk/logs/logger_provider.h" # include "opentelemetry/sdk/resource/resource.h" # include "opentelemetry/test_common/ext/http/client/http_client_test_factory.h" # include "opentelemetry/test_common/ext/http/client/nosend/http_client_nosend.h" +# include "opentelemetry/test_common/sdk/common/scoped_test_log_handler.h" # include # include @@ -45,6 +51,20 @@ namespace exporter namespace otlp { +namespace +{ +class ProtobufGlobalSymbolGuard +{ +public: + ProtobufGlobalSymbolGuard() = default; + ~ProtobufGlobalSymbolGuard() { google::protobuf::ShutdownProtobufLibrary(); } + ProtobufGlobalSymbolGuard(const ProtobufGlobalSymbolGuard &) = delete; + ProtobufGlobalSymbolGuard &operator=(const ProtobufGlobalSymbolGuard &) = delete; + ProtobufGlobalSymbolGuard(ProtobufGlobalSymbolGuard &&) = delete; + ProtobufGlobalSymbolGuard &operator=(ProtobufGlobalSymbolGuard &&) = delete; +}; +} // namespace + template static nostd::span MakeSpan(T (&array)[N]) { @@ -88,6 +108,8 @@ static OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType co namespace http_client = opentelemetry::ext::http::client; +using opentelemetry::test_common::ScopedTestLogHandler; + class OtlpHttpLogRecordExporterTestPeer : public ::testing::Test { public: @@ -673,7 +695,7 @@ TEST_F(OtlpHttpLogRecordExporterTestPeer, ExportJsonIntegrationTestSync) TEST_F(OtlpHttpLogRecordExporterTestPeer, ExportJsonIntegrationTestAsync) { ExportJsonIntegrationTestAsync(); - google::protobuf::ShutdownProtobufLibrary(); + static ProtobufGlobalSymbolGuard global_symbol_guard; } # endif @@ -869,6 +891,86 @@ TEST_F(OtlpHttpLogRecordExporterTestPeer, ConfigRetryGenericValuesFromEnv) } # endif // NO_GETENV +// Exporter logs the rejection on partial_success. +TEST_F(OtlpHttpLogRecordExporterTestPeer, ExportPartialSuccess) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + proto::collector::logs::v1::ExportLogsServiceResponse partial; + partial.mutable_partial_success()->set_rejected_log_records(21); + partial.mutable_partial_success()->set_error_message("too many logs!!"); + std::string serialized = partial.SerializeAsString(); + + auto mock_otlp_client = + OtlpHttpLogRecordExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto exporter = GetExporter(std::unique_ptr{mock_otlp_client.first}); + + auto no_send_client = + std::static_pointer_cast(mock_otlp_client.second); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&serialized](const std::shared_ptr &callback) { + http_client::nosend::Response response; + response.body_.assign(serialized.begin(), serialized.end()); + response.Finish(*callback.get()); + }); + + auto recordable = exporter->MakeRecordable(); + nostd::span> batch(&recordable, 1); + EXPECT_EQ(opentelemetry::sdk::common::ExportResult::kSuccess, exporter->Export(batch)); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 log record(s) rejected")); + EXPECT_TRUE(contains("too many logs!!")); +} + +// Exporter logs the rejection on partial_success when the response is JSON encoded. +TEST_F(OtlpHttpLogRecordExporterTestPeer, ExportPartialSuccessJson) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + std::string serialized = + R"({"partialSuccess":{"rejectedLogRecords":"21","errorMessage":"too many logs!!"}})"; + + auto mock_otlp_client = + OtlpHttpLogRecordExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto exporter = GetExporter(std::unique_ptr{mock_otlp_client.first}); + + auto no_send_client = + std::static_pointer_cast(mock_otlp_client.second); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&serialized](const std::shared_ptr &callback) { + http_client::nosend::Response response; + response.body_.assign(serialized.begin(), serialized.end()); + response.Finish(*callback.get()); + }); + + auto recordable = exporter->MakeRecordable(); + nostd::span> batch(&recordable, 1); + EXPECT_EQ(opentelemetry::sdk::common::ExportResult::kSuccess, exporter->Export(batch)); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 log record(s) rejected")); + EXPECT_TRUE(contains("too many logs!!")); +} + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/exporters/otlp/test/otlp_http_metric_exporter_test.cc b/exporters/otlp/test/otlp_http_metric_exporter_test.cc index 929c7159e5..931dff8544 100644 --- a/exporters/otlp/test/otlp_http_metric_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_metric_exporter_test.cc @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 #include +#include #include #include #include @@ -21,9 +22,10 @@ #include "opentelemetry/exporters/otlp/otlp_metric_utils.h" #include "opentelemetry/exporters/otlp/otlp_preferred_temporality.h" #include "opentelemetry/ext/http/client/http_client.h" +#include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/nostd/string_view.h" -#include "opentelemetry/nostd/unique_ptr.h" #include "opentelemetry/sdk/common/exporter_utils.h" +#include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk/common/thread_instrumentation.h" #include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h" #include "opentelemetry/sdk/metrics/data/metric_data.h" @@ -34,6 +36,7 @@ #include "opentelemetry/sdk/resource/resource.h" #include "opentelemetry/test_common/ext/http/client/http_client_test_factory.h" #include "opentelemetry/test_common/ext/http/client/nosend/http_client_nosend.h" +#include "opentelemetry/test_common/sdk/common/scoped_test_log_handler.h" #include "opentelemetry/version.h" // clang-format off @@ -60,6 +63,20 @@ namespace exporter namespace otlp { +namespace +{ +class ProtobufGlobalSymbolGuard +{ +public: + ProtobufGlobalSymbolGuard() = default; + ~ProtobufGlobalSymbolGuard() { google::protobuf::ShutdownProtobufLibrary(); } + ProtobufGlobalSymbolGuard(const ProtobufGlobalSymbolGuard &) = delete; + ProtobufGlobalSymbolGuard &operator=(const ProtobufGlobalSymbolGuard &) = delete; + ProtobufGlobalSymbolGuard(ProtobufGlobalSymbolGuard &&) = delete; + ProtobufGlobalSymbolGuard &operator=(ProtobufGlobalSymbolGuard &&) = delete; +}; +} // namespace + template static IntegerType JsonToInteger(const nlohmann::json &value) { @@ -108,6 +125,8 @@ static OtlpHttpClientOptions MakeOtlpHttpClientOptions(HttpRequestContentType co namespace http_client = opentelemetry::ext::http::client; +using opentelemetry::test_common::ScopedTestLogHandler; + class OtlpHttpMetricExporterTestPeer : public ::testing::Test { public: @@ -975,7 +994,7 @@ TEST_F(OtlpHttpMetricExporterTestPeer, ConfigJsonBytesMappingTest) opts.json_bytes_mapping = JsonBytesMappingKind::kHex; std::unique_ptr exporter(new OtlpHttpMetricExporter(opts)); EXPECT_EQ(GetOptions(exporter).json_bytes_mapping, JsonBytesMappingKind::kHex); - google::protobuf::ShutdownProtobufLibrary(); + static ProtobufGlobalSymbolGuard global_symbol_guard; } TEST(OtlpHttpMetricExporterTest, ConfigDefaultProtocolTest) @@ -1223,6 +1242,122 @@ TEST_F(OtlpHttpMetricExporterTestPeer, PreferredAggergationTemporality) opentelemetry::sdk::metrics::AggregationTemporality::kCumulative); } +// Exporter logs the rejection on partial_success. +TEST_F(OtlpHttpMetricExporterTestPeer, ExportPartialSuccess) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + auto mock_otlp_client = + OtlpHttpMetricExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary); + auto exporter = GetExporter(std::unique_ptr{mock_otlp_client.first}); + + auto no_send_client = + std::static_pointer_cast(mock_otlp_client.second); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + + opentelemetry::sdk::metrics::SumPointData sum_point_data{}; + sum_point_data.value_ = 10.0; + opentelemetry::sdk::metrics::ResourceMetrics data; + auto resource = opentelemetry::sdk::resource::Resource::Create( + opentelemetry::sdk::resource::ResourceAttributes{}); + data.resource_ = &resource; + auto scope = opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create( + "library_name", "1.5.0"); + opentelemetry::sdk::metrics::MetricData metric_data{ + opentelemetry::sdk::metrics::InstrumentDescriptor{ + "metrics_name", "description", "unit", + opentelemetry::sdk::metrics::InstrumentType::kCounter, + opentelemetry::sdk::metrics::InstrumentValueType::kDouble}, + opentelemetry::sdk::metrics::AggregationTemporality::kDelta, + opentelemetry::common::SystemTimestamp{}, opentelemetry::common::SystemTimestamp{}, + std::vector{ + {opentelemetry::sdk::metrics::PointAttributes{}, sum_point_data}}}; + data.scope_metric_data_ = std::vector{ + {scope.get(), std::vector{metric_data}}}; + + proto::collector::metrics::v1::ExportMetricsServiceResponse partial; + partial.mutable_partial_success()->set_rejected_data_points(21); + partial.mutable_partial_success()->set_error_message("too many data points!!"); + std::string serialized = partial.SerializeAsString(); + + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&serialized](const std::shared_ptr &callback) { + http_client::nosend::Response response; + response.body_.assign(serialized.begin(), serialized.end()); + response.Finish(*callback.get()); + }); + + EXPECT_EQ(opentelemetry::sdk::common::ExportResult::kSuccess, exporter->Export(data)); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 data point(s) rejected")); + EXPECT_TRUE(contains("too many data points!!")); +} + +// Exporter logs the rejection on partial_success when the response is JSON encoded. +TEST_F(OtlpHttpMetricExporterTestPeer, ExportPartialSuccessJson) +{ + ScopedTestLogHandler log{sdk::common::internal_log::LogLevel::Error}; + + auto mock_otlp_client = + OtlpHttpMetricExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson); + auto exporter = GetExporter(std::unique_ptr{mock_otlp_client.first}); + + auto no_send_client = + std::static_pointer_cast(mock_otlp_client.second); + auto mock_session = + std::static_pointer_cast(no_send_client->session_); + + opentelemetry::sdk::metrics::SumPointData sum_point_data{}; + sum_point_data.value_ = 10.0; + opentelemetry::sdk::metrics::ResourceMetrics data; + auto resource = opentelemetry::sdk::resource::Resource::Create( + opentelemetry::sdk::resource::ResourceAttributes{}); + data.resource_ = &resource; + auto scope = opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create( + "library_name", "1.5.0"); + opentelemetry::sdk::metrics::MetricData metric_data{ + opentelemetry::sdk::metrics::InstrumentDescriptor{ + "metrics_name", "description", "unit", + opentelemetry::sdk::metrics::InstrumentType::kCounter, + opentelemetry::sdk::metrics::InstrumentValueType::kDouble}, + opentelemetry::sdk::metrics::AggregationTemporality::kDelta, + opentelemetry::common::SystemTimestamp{}, opentelemetry::common::SystemTimestamp{}, + std::vector{ + {opentelemetry::sdk::metrics::PointAttributes{}, sum_point_data}}}; + data.scope_metric_data_ = std::vector{ + {scope.get(), std::vector{metric_data}}}; + + std::string serialized = + R"({"partialSuccess":{"rejectedDataPoints":"21","errorMessage":"too many data points!!"}})"; + + EXPECT_CALL(*mock_session, SendRequest) + .WillOnce([&serialized](const std::shared_ptr &callback) { + http_client::nosend::Response response; + response.body_.assign(serialized.begin(), serialized.end()); + response.Finish(*callback.get()); + }); + + EXPECT_EQ(opentelemetry::sdk::common::ExportResult::kSuccess, exporter->Export(data)); + + auto entries = log.Drain(); + auto contains = [&](const std::string &needle) { + return std::any_of(entries.begin(), entries.end(), [&](const ScopedTestLogHandler::Entry &e) { + return e.msg.find(needle) != std::string::npos; + }); + }; + EXPECT_TRUE(contains("partial success")); + EXPECT_TRUE(contains("21 data point(s) rejected")); + EXPECT_TRUE(contains("too many data points!!")); +} + } // namespace otlp } // namespace exporter OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/metrics/BUILD b/sdk/test/metrics/BUILD index 40ffbc8b62..05ee010076 100644 --- a/sdk/test/metrics/BUILD +++ b/sdk/test/metrics/BUILD @@ -47,6 +47,7 @@ cc_test( ], deps = [ "metrics_common_test_utils", + "//test_common:headers", "@com_google_googletest//:gtest_main", ], ) diff --git a/sdk/test/metrics/CMakeLists.txt b/sdk/test/metrics/CMakeLists.txt index 63e6e58d31..4289e17ea2 100644 --- a/sdk/test/metrics/CMakeLists.txt +++ b/sdk/test/metrics/CMakeLists.txt @@ -38,8 +38,13 @@ foreach( instrument_descriptor_test) add_executable(${testname} "${testname}.cc") target_link_libraries( - ${testname} ${GTEST_BOTH_LIBRARIES} ${GMOCK_LIB} ${CMAKE_THREAD_LIBS_INIT} - metrics_common_test_utils opentelemetry_resources) + ${testname} + ${GTEST_BOTH_LIBRARIES} + ${GMOCK_LIB} + ${CMAKE_THREAD_LIBS_INIT} + metrics_common_test_utils + opentelemetry_resources + opentelemetry_test_common) target_compile_definitions(${testname} PRIVATE UNIT_TESTING) gtest_add_tests( TARGET ${testname} diff --git a/sdk/test/metrics/meter_provider_sdk_test.cc b/sdk/test/metrics/meter_provider_sdk_test.cc index 1251a19e56..dd0e02920c 100644 --- a/sdk/test/metrics/meter_provider_sdk_test.cc +++ b/sdk/test/metrics/meter_provider_sdk_test.cc @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 #include -#include +#include #include #include #include @@ -10,7 +10,6 @@ #include "opentelemetry/common/macros.h" #include "opentelemetry/metrics/meter.h" -#include "opentelemetry/nostd/shared_ptr.h" #include "opentelemetry/nostd/string_view.h" #include "opentelemetry/sdk/common/global_log_handler.h" #include "opentelemetry/sdk/metrics/instruments.h" @@ -22,6 +21,7 @@ #include "opentelemetry/sdk/metrics/view/instrument_selector.h" #include "opentelemetry/sdk/metrics/view/meter_selector.h" #include "opentelemetry/sdk/metrics/view/view.h" +#include "opentelemetry/test_common/sdk/common/scoped_test_log_handler.h" #if OPENTELEMETRY_ABI_VERSION_NO >= 2 # include @@ -37,77 +37,7 @@ using namespace opentelemetry::sdk::metrics; using namespace opentelemetry::sdk::common::internal_log; - -namespace -{ - -class ScopedTestLogHandler final -{ -public: - struct Entry - { - LogLevel level; - std::string msg; - }; - -private: - class LogHandlerImpl : public LogHandler - { - public: - void Handle(LogLevel level, - const char * /*file*/, - int /*line*/, - const char *msg, - const opentelemetry::sdk::common::AttributeMap & /*attrs*/) noexcept override - { - std::lock_guard lk(mu_); - entries_.push_back({level, msg ? msg : ""}); - } - - std::vector Drain() - { - std::lock_guard lk(mu_); - return std::exchange(entries_, {}); - } - - private: - std::mutex mu_; - std::vector entries_; - }; - -public: - explicit ScopedTestLogHandler(LogLevel level) - : previous_handler_(GlobalLogHandler::GetLogHandler()), - previous_level_(GlobalLogHandler::GetLogLevel()) - { - opentelemetry::nostd::shared_ptr handler{new LogHandlerImpl{}}; - handler_ = handler.get(); - - GlobalLogHandler::SetLogHandler(std::move(handler)); - GlobalLogHandler::SetLogLevel(level); - } - - ~ScopedTestLogHandler() - { - handler_ = nullptr; - GlobalLogHandler::SetLogHandler(previous_handler_); - GlobalLogHandler::SetLogLevel(previous_level_); - } - - ScopedTestLogHandler(const ScopedTestLogHandler &) = delete; - ScopedTestLogHandler &operator=(const ScopedTestLogHandler &) = delete; - ScopedTestLogHandler(ScopedTestLogHandler &&) = delete; - ScopedTestLogHandler &operator=(ScopedTestLogHandler &&) = delete; - - std::vector Drain() { return handler_->Drain(); } - -private: - LogHandlerImpl *handler_{nullptr}; - opentelemetry::nostd::shared_ptr previous_handler_; - LogLevel previous_level_; -}; - -} // namespace +using opentelemetry::test_common::ScopedTestLogHandler; TEST(MeterProvider, GetMeter) { diff --git a/test_common/include/opentelemetry/test_common/sdk/common/scoped_test_log_handler.h b/test_common/include/opentelemetry/test_common/sdk/common/scoped_test_log_handler.h new file mode 100644 index 0000000000..398ad4f16f --- /dev/null +++ b/test_common/include/opentelemetry/test_common/sdk/common/scoped_test_log_handler.h @@ -0,0 +1,86 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include +#include +#include +#include + +#include "opentelemetry/nostd/shared_ptr.h" +#include "opentelemetry/sdk/common/attribute_utils.h" +#include "opentelemetry/sdk/common/global_log_handler.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace test_common +{ + +class ScopedTestLogHandler final +{ +public: + struct Entry + { + sdk::common::internal_log::LogLevel level; + std::string msg; + }; + +private: + class LogHandlerImpl : public sdk::common::internal_log::LogHandler + { + public: + void Handle(sdk::common::internal_log::LogLevel level, + const char * /*file*/, + int /*line*/, + const char *msg, + const sdk::common::AttributeMap & /*attrs*/) noexcept override + { + std::lock_guard lk(mu_); + entries_.push_back({level, msg ? msg : ""}); + } + + std::vector Drain() + { + std::lock_guard lk(mu_); + return std::exchange(entries_, {}); + } + + private: + std::mutex mu_; + std::vector entries_; + }; + +public: + explicit ScopedTestLogHandler(sdk::common::internal_log::LogLevel level) + : previous_handler_(sdk::common::internal_log::GlobalLogHandler::GetLogHandler()), + previous_level_(sdk::common::internal_log::GlobalLogHandler::GetLogLevel()) + { + opentelemetry::nostd::shared_ptr handler{new LogHandlerImpl{}}; + handler_ = handler.get(); + sdk::common::internal_log::GlobalLogHandler::SetLogHandler(std::move(handler)); + sdk::common::internal_log::GlobalLogHandler::SetLogLevel(level); + } + + ~ScopedTestLogHandler() + { + handler_ = nullptr; + sdk::common::internal_log::GlobalLogHandler::SetLogHandler(previous_handler_); + sdk::common::internal_log::GlobalLogHandler::SetLogLevel(previous_level_); + } + + ScopedTestLogHandler(const ScopedTestLogHandler &) = delete; + ScopedTestLogHandler &operator=(const ScopedTestLogHandler &) = delete; + ScopedTestLogHandler(ScopedTestLogHandler &&) = delete; + ScopedTestLogHandler &operator=(ScopedTestLogHandler &&) = delete; + + std::vector Drain() { return handler_->Drain(); } + +private: + LogHandlerImpl *handler_{nullptr}; + opentelemetry::nostd::shared_ptr previous_handler_; + sdk::common::internal_log::LogLevel previous_level_; +}; + +} // namespace test_common +OPENTELEMETRY_END_NAMESPACE