diff --git a/src/iceberg/catalog/rest/http_client.cc b/src/iceberg/catalog/rest/http_client.cc index 84d458b98..41be14cec 100644 --- a/src/iceberg/catalog/rest/http_client.cc +++ b/src/iceberg/catalog/rest/http_client.cc @@ -134,41 +134,9 @@ Status HandleFailureResponse(const cpr::Response& response, } // namespace -void HttpClient::PrepareSession( - const std::string& path, HttpMethod method, - const std::unordered_map& params, - const std::unordered_map& headers) { - session_->SetUrl(cpr::Url{path}); - session_->SetParameters(GetParameters(params)); - session_->RemoveContent(); - // clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set - // to 1 by POST requests, and this state is not reset by RemoveContent(), so we must - // manually enforce HTTP GET to clear it. - curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L); - switch (method) { - case HttpMethod::kGet: - session_->PrepareGet(); - break; - case HttpMethod::kPost: - session_->PreparePost(); - break; - case HttpMethod::kPut: - session_->PreparePut(); - break; - case HttpMethod::kDelete: - session_->PrepareDelete(); - break; - case HttpMethod::kHead: - session_->PrepareHead(); - break; - } - auto final_headers = MergeHeaders(default_headers_, headers); - session_->SetHeader(final_headers); -} - HttpClient::HttpClient(std::unordered_map default_headers) : default_headers_{std::move(default_headers)}, - session_{std::make_unique()} { + connection_pool_{std::make_unique()} { // Set default Content-Type for all requests (including GET/HEAD/DELETE). // Many systems require that content type is set regardless and will fail, // even on an empty bodied request. @@ -182,12 +150,9 @@ Result HttpClient::Get( const std::string& path, const std::unordered_map& params, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kGet, params, headers); - response = session_->Get(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = + cpr::Get(cpr::Url{path}, GetParameters(params), final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -199,13 +164,9 @@ Result HttpClient::Post( const std::string& path, const std::string& body, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers); - session_->SetBody(cpr::Body{body}); - response = session_->Post(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = + cpr::Post(cpr::Url{path}, cpr::Body{body}, final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -218,25 +179,16 @@ Result HttpClient::PostForm( const std::unordered_map& form_data, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - - { - std::lock_guard guard(session_mutex_); - - // Override default Content-Type (application/json) with form-urlencoded - auto form_headers = headers; - form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded; - - PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers); - std::vector pair_list; - pair_list.reserve(form_data.size()); - for (const auto& [key, val] : form_data) { - pair_list.emplace_back(key, val); - } - session_->SetPayload(cpr::Payload(pair_list.begin(), pair_list.end())); - - response = session_->Post(); + auto final_headers = MergeHeaders(default_headers_, headers); + final_headers.insert_or_assign(kHeaderContentType, kMimeTypeFormUrlEncoded); + std::vector pair_list; + pair_list.reserve(form_data.size()); + for (const auto& [key, val] : form_data) { + pair_list.emplace_back(key, val); } + cpr::Response response = + cpr::Post(cpr::Url{path}, cpr::Payload(pair_list.begin(), pair_list.end()), + final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -247,12 +199,8 @@ Result HttpClient::PostForm( Result HttpClient::Head( const std::string& path, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers); - response = session_->Head(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = cpr::Head(cpr::Url{path}, final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; @@ -264,12 +212,9 @@ Result HttpClient::Delete( const std::string& path, const std::unordered_map& params, const std::unordered_map& headers, const ErrorHandler& error_handler) { - cpr::Response response; - { - std::lock_guard guard(session_mutex_); - PrepareSession(path, HttpMethod::kDelete, params, headers); - response = session_->Delete(); - } + auto final_headers = MergeHeaders(default_headers_, headers); + cpr::Response response = cpr::Delete(cpr::Url{path}, GetParameters(params), + final_headers, *connection_pool_); ICEBERG_RETURN_UNEXPECTED(HandleFailureResponse(response, error_handler)); HttpResponse http_response; diff --git a/src/iceberg/catalog/rest/http_client.h b/src/iceberg/catalog/rest/http_client.h index 84f8e5906..38f902e4d 100644 --- a/src/iceberg/catalog/rest/http_client.h +++ b/src/iceberg/catalog/rest/http_client.h @@ -21,11 +21,9 @@ #include #include -#include #include #include -#include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/type_fwd.h" #include "iceberg/result.h" @@ -34,7 +32,7 @@ /// \brief Http client for Iceberg REST API. namespace cpr { -class Session; +class ConnectionPool; } // namespace cpr namespace iceberg::rest { @@ -110,16 +108,8 @@ class ICEBERG_REST_EXPORT HttpClient { const ErrorHandler& error_handler); private: - void PrepareSession(const std::string& path, HttpMethod method, - const std::unordered_map& params, - const std::unordered_map& headers); - std::unordered_map default_headers_; - - // TODO(Li Feiyang): use connection pool to support external multi-threaded concurrent - // calls - std::unique_ptr session_; - mutable std::mutex session_mutex_; + std::unique_ptr connection_pool_; }; } // namespace iceberg::rest