From 505ddbe541187eb9767ed35be1ef62958f4c1f60 Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Fri, 23 Jan 2026 17:12:39 +0800 Subject: [PATCH] refactor(rest): switch HttpClient to use connection pool --- .../IcebergThirdpartyToolchain.cmake | 2 +- src/iceberg/catalog/rest/http_client.cc | 97 ++++--------------- src/iceberg/catalog/rest/http_client.h | 14 +-- subprojects/cpr.wrap | 21 ++-- 4 files changed, 35 insertions(+), 99 deletions(-) diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index 289caa400..dd1f8a87e 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -449,7 +449,7 @@ function(resolve_cpr_dependency) if(DEFINED ENV{ICEBERG_CPR_URL}) set(CPR_URL "$ENV{ICEBERG_CPR_URL}") else() - set(CPR_URL "https://github.com/libcpr/cpr/archive/refs/tags/1.12.0.tar.gz") + set(CPR_URL "https://github.com/libcpr/cpr/archive/refs/tags/1.14.1.tar.gz") endif() fetchcontent_declare(cpr diff --git a/src/iceberg/catalog/rest/http_client.cc b/src/iceberg/catalog/rest/http_client.cc index 84d458b98..41be14cec 100644 --- a/src/iceberg/catalog/rest/http_client.cc +++ b/src/iceberg/catalog/rest/http_client.cc @@ -134,41 +134,9 @@ Status HandleFailureResponse(const cpr::Response& response, } // namespace -void HttpClient::PrepareSession( - const std::string& path, HttpMethod method, - const std::unordered_map& params, - const std::unordered_map& headers) { - session_->SetUrl(cpr::Url{path}); - session_->SetParameters(GetParameters(params)); - session_->RemoveContent(); - // clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set - // to 1 by POST requests, and this state is not reset by RemoveContent(), so we must - // manually enforce HTTP GET to clear it. - curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L); - switch (method) { - case HttpMethod::kGet: - session_->PrepareGet(); - break; - case HttpMethod::kPost: - session_->PreparePost(); - break; - case HttpMethod::kPut: - session_->PreparePut(); - break; - case HttpMethod::kDelete: - session_->PrepareDelete(); - break; - case HttpMethod::kHead: - session_->PrepareHead(); - break; - } - auto final_headers = MergeHeaders(default_headers_, headers); - session_->SetHeader(final_headers); -} - HttpClient::HttpClient(std::unordered_map default_headers) : default_headers_{std::move(default_headers)}, - session_{std::make_unique()} { + connection_pool_{std::make_unique()} { // Set default Content-Type for all requests (including GET/HEAD/DELETE). // Many systems require that content type is set regardless and will fail, // even on an empty bodied request. @@ -182,12 +150,9 @@ Result HttpClient::Get( const std::string& path, const std::unordered_map& params, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kGet, params, headers); - response = session_->Get(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = + cpr::Get(cpr::Url{path}, GetParameters(params), final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -199,13 +164,9 @@ Result HttpClient::Post( const std::string& path, const std::string& body, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers); - session_->SetBody(cpr::Body{body}); - response = session_->Post(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = + cpr::Post(cpr::Url{path}, cpr::Body{body}, final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -218,25 +179,16 @@ Result HttpClient::PostForm( const std::unordered_map& form_data, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - - { - std::lock_guard guard(session_mutex_); - - // Override default Content-Type (application/json) with form-urlencoded - auto form_headers = headers; - form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded; - - PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers); - std::vector pair_list; - pair_list.reserve(form_data.size()); - for (const auto& [key, val] : form_data) { - pair_list.emplace_back(key, val); - } - session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end())); - - response = session_->Post(); + auto final_headers = MergeHeaders(default_headers_, headers); + final_headers.insert_or_assign(kHeaderContentType, kMimeTypeFormUrlEncoded); + std::vector pair_list; + pair_list.reserve(form_data.size()); + for (const auto& [key, val] : form_data) { + pair_list.emplace_back(key, val); } + cpr::Response response = + cpr::Post(cpr::Url{path}, cpr::Payload(pair_list.begin(), pair_list.end()), + final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -247,12 +199,8 @@ Result HttpClient::PostForm( Result HttpClient::Head( const std::string& path, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers); - response = session_->Head(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = cpr::Head(cpr::Url{path}, final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -264,12 +212,9 @@ Result HttpClient::Delete( const std::string& path, const std::unordered_map& params, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kDelete, params, headers); - response = session_->Delete(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = cpr::Delete(cpr::Url{path}, GetParameters(params), + final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; diff --git a/src/iceberg/catalog/rest/http_client.h b/src/iceberg/catalog/rest/http_client.h index 84f8e5906..38f902e4d 100644 --- a/src/iceberg/catalog/rest/http_client.h +++ b/src/iceberg/catalog/rest/http_client.h @@ -21,11 +21,9 @@ #include #include -#include #include #include -#include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/type_fwd.h" #include "iceberg/result.h" @@ -34,7 +32,7 @@ /// \brief Http client for Iceberg REST API. namespace cpr { -class Session; +class ConnectionPool; } // namespace cpr namespace iceberg::rest { @@ -110,16 +108,8 @@ class ICEBERG_REST_EXPORT HttpClient { const ErrorHandler& error_handler); private: - void PrepareSession(const std::string& path, HttpMethod method, - const std::unordered_map& params, - const std::unordered_map& headers); - std::unordered_map default_headers_; - - // TODO(Li Feiyang): use connection pool to support external multi-threaded concurrent - // calls - std::unique_ptr session_; - mutable std::mutex session_mutex_; + std::unique_ptr connection_pool_; }; } // namespace iceberg::rest diff --git a/subprojects/cpr.wrap b/subprojects/cpr.wrap index a52ebadf6..d213e00ba 100644 --- a/subprojects/cpr.wrap +++ b/subprojects/cpr.wrap @@ -16,15 +16,16 @@ # under the License. [wrap-file] -directory = cpr-1.12.0 -source_url = https://github.com/libcpr/cpr/archive/1.12.0.tar.gz -source_filename = cpr-1.12.0.tar.gz -source_hash = f64b501de66e163d6a278fbb6a95f395ee873b7a66c905dd785eae107266a709 -patch_filename = cpr_1.12.0-1_patch.zip -patch_url = https://wrapdb.mesonbuild.com/v2/cpr_1.12.0-1/get_patch -patch_hash = 16404431dd8b2dbb49afc78a07b3bbe3c84c9f83ce1f45c3510934fadab99e72 -source_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/cpr_1.12.0-1/cpr-1.12.0.tar.gz -wrapdb_version = 1.12.0-1 +directory = cpr-1.14.1 +source_url = https://github.com/libcpr/cpr/archive/1.14.1.tar.gz +source_filename = cpr-1.14.1.tar.gz +source_hash = 213ccc7c98683d2ca6304d9760005effa12ec51d664bababf114566cb2b1e23c +source_fallback_url = https://wrapdb.mesonbuild.com/v2/cpr_1.14.1-1/get_source/cpr-1.14.1.tar.gz +patch_filename = cpr_1.14.1-1_patch.zip +patch_url = https://wrapdb.mesonbuild.com/v2/cpr_1.14.1-1/get_patch +patch_fallback_url = https://github.com/mesonbuild/wrapdb/releases/download/cpr_1.14.1-1/cpr_1.14.1-1_patch.zip +patch_hash = e5930186aa8cfb9383a468a80d177b3a4c4fcc5f38deb6fca13d96263ce36459 +wrapdb_version = 1.14.1-1 [provide] -cpr = cpr_dep +dependency_names = cpr