From 57795f73b86a62858b3b29011cd8cfcfe378db21 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 13 May 2026 06:07:54 +0000 Subject: [PATCH 1/3] feat(storage): Add delete object source in compose object api --- .bazelignore | 1 + .../cloud/storage/internal/object_requests.h | 8 +- .../tests/async_client_integration_test.cc | 1300 +++++------------ .../tests/object_rewrite_integration_test.cc | 76 + google/cloud/storage/well_known_parameters.h | 8 + 5 files changed, 415 insertions(+), 978 deletions(-) diff --git a/.bazelignore b/.bazelignore index 98112232070b2..67a49d97379c7 100644 --- a/.bazelignore +++ b/.bazelignore @@ -5,3 +5,4 @@ cmake-build-debug/ cmake-build-coverage/ cmake-build-release/ .build/ +build_/ diff --git a/google/cloud/storage/internal/object_requests.h b/google/cloud/storage/internal/object_requests.h index 6329b76eda038..6fd14a86b841b 100644 --- a/google/cloud/storage/internal/object_requests.h +++ b/google/cloud/storage/internal/object_requests.h @@ -266,10 +266,10 @@ std::ostream& operator<<(std::ostream& os, UpdateObjectRequest const& r); * Represents a request to the `Objects: compose` API. */ class ComposeObjectRequest - : public GenericObjectRequest { + : public GenericObjectRequest< + ComposeObjectRequest, EncryptionKey, DestinationPredefinedAcl, + KmsKeyName, IfGenerationMatch, IfMetagenerationMatch, UserProject, + WithObjectMetadata, DeleteSourceObjects> { public: ComposeObjectRequest() = default; explicit ComposeObjectRequest(std::string bucket_name, diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 8bfccb3f2db16..f2a52e456927d 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,8 +17,8 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" -#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" +#include "google/cloud/opentelemetry_options.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/grpc_options.h" @@ -32,6 +32,8 @@ #include #include #include +#include +#include #include namespace google { @@ -52,15 +54,7 @@ using ::testing::VariantWith; class AsyncClientIntegrationTest : public google::cloud::storage::testing::StorageIntegrationTest { - protected: - void SetUp() override { - bucket_name_ = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); - ASSERT_THAT(bucket_name_, Not(IsEmpty())) - << "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set"; - } - - std::string const& bucket_name() const { return bucket_name_; } +protected: using google::cloud::storage::testing::StorageIntegrationTest:: ScheduleForDelete; @@ -71,992 +65,350 @@ class AsyncClientIntegrationTest .set_generation(object.generation())); } - private: +private: std::string bucket_name_; }; -auto TestOptions() { - // Disable metrics in the test, they just make the logs harder to grok. - return Options{} - .set(false) - .set(1) - .set(TracingOptions().SetOptions( - "truncate_string_field_longer_than=2048")); -} - -auto AlwaysRetry() { - return TestOptions().set( - MakeAlwaysRetryAsyncIdempotencyPolicy); -} - -TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - LoremIpsum(), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - - for (auto* p : {&full1, &full0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const full = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full, LoremIpsum()); - } - for (auto* p : {&partial1, &partial0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const partial = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(partial, LoremIpsum().substr(2)); - } - - auto status = async - .DeleteObject(BucketName(bucket_name()), object_name, - insert->generation()) - .get(); - EXPECT_STATUS_OK(status); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - auto head = async.ReadObjectRange(request, /*offset=*/0, /*limit=*/1).get(); - EXPECT_THAT(head, StatusIs(StatusCode::kNotFound)); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - auto destination = MakeRandomObjectName(); - - auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), - AlwaysRetry()); - auto insert2 = async.InsertObject(BucketName(bucket_name()), o2, LoremIpsum(), - AlwaysRetry()); - std::vector> inserted{insert1.get(), - insert2.get()}; - for (auto const& insert : inserted) { - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - } - std::vector sources; - std::transform(inserted.begin(), inserted.end(), std::back_inserter(sources), - [](auto const& o) { - google::storage::v2::ComposeObjectRequest::SourceObject r; - r.set_name(o->name()); - r.set_generation(o->generation()); - return r; - }); - auto pending = async.ComposeObject(BucketName(bucket_name()), destination, - std::move(sources)); - auto const composed = pending.get(); - EXPECT_STATUS_OK(composed); - ScheduleForDelete(*composed); - - auto read = async - .ReadObjectRange(BucketName(bucket_name()), destination, 0, - 2 * LoremIpsum().size()) - .get(); - ASSERT_STATUS_OK(read); - auto contents = read->contents(); - auto const full_contents = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full_contents, LoremIpsum() + LoremIpsum()); - EXPECT_THAT(read->metadata(), Optional(IsProtoEqual(*composed))); -} - -TEST_F(AsyncClientIntegrationTest, StreamingRead) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto const block = MakeRandomData(kLineSize); - std::vector insert_data(kLineCount); - std::generate(insert_data.begin(), insert_data.end(), [&, n = 0]() mutable { - return std::to_string(++n) + ": " + block; - }); - auto const expected_size = std::accumulate( - insert_data.begin(), insert_data.end(), static_cast(0), - [](auto a, auto const& b) { return a + b.size(); }); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - insert_data, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_size); - - auto r = async.ReadObject(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); - } - EXPECT_EQ(actual.size(), expected_size); - auto view = absl::string_view(actual); - for (auto const& expected : insert_data) { - ASSERT_GE(view.size(), expected.size()); - ASSERT_EQ(expected, view.substr(0, expected.size())); - view.remove_prefix(expected.size()); - } - EXPECT_EQ(view, absl::string_view{}); -} - -TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto constexpr kReadOffset = kLineCount * kLineSize / 2; - auto const block = MakeRandomData(kLineSize - 1) + "\n"; - std::string contents; - for (int i = 0; i != kLineCount; ++i) contents += block; - auto const expected_insert_size = contents.size(); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - contents, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_insert_size); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - request.set_read_offset(kReadOffset); - auto r = async.ReadObject(request).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); - } - - EXPECT_EQ(absl::string_view(actual), - absl::string_view(contents).substr(kReadOffset)); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); +namespace gcs = ::google::cloud::storage; - auto const upload_id = writer.UploadId(); - for (int i = 0; i != kInitialBlockCount - 1; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } +// auto AlwaysRetry() { +// return google::cloud::Options{}.set( +// MakeAlwaysRetryIdempotencyPolicy); +// } - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - ASSERT_EQ(writer.UploadId(), upload_id); - auto const persisted = writer.PersistedState(); - // We don't expect this to be larger that the total size of the object. - // Incidentally, this shows the value fits into an `int`. - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); +google::cloud::Options MakeOptions(google::cloud::Options opts) { + auto fallback = google::cloud::Options{}; + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_GRPC_ENDPOINT")) { + fallback.set(*v); } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_JSON_ENDPOINT")) { + fallback.set(*v); } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto const upload_id = writer.UploadId(); - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_AUTHORITY")) { + fallback.set(*v); } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto insert = async - .InsertObject(BucketName(bucket_name()), o1, - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - google::storage::v2::Object metadata; - AsyncRewriter rewriter; - AsyncToken token; - google::storage::v2::RewriteObjectRequest request; - request.set_destination_name(o2); - request.set_destination_bucket(BucketName(bucket_name()).FullName()); - request.set_source_object(o1); - request.set_source_bucket(BucketName(bucket_name()).FullName()); - request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(std::move(request)); - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_FALSE(token.valid()); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_TARGET_API_VERSION")) { + fallback.set(*v); } - EXPECT_EQ(metadata.name(), o2); - EXPECT_EQ(metadata.size(), insert->size()); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { - auto async = AsyncClient(TestOptions()); - auto destination = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); - if (!destination || destination->empty()) GTEST_SKIP(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto source = - async - .InsertObject(BucketName(bucket_name()), MakeRandomObjectName(), - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(source); - ScheduleForDelete(*source); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - AsyncRewriter rewriter; - AsyncToken token; - auto const expected_name = MakeRandomObjectName(); - google::storage::v2::RewriteObjectRequest start_request; - start_request.set_destination_name(expected_name); - start_request.set_destination_bucket(BucketName(*destination).FullName()); - start_request.set_source_object(source->name()); - start_request.set_source_bucket(source->bucket()); - start_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(start_request); - - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - - // We want to resume a partially completed resume. Verify the first rewrite - // did not complete things. - ASSERT_THAT(response.rewrite_token(), Not(IsEmpty())); + fallback.set(false); + return google::cloud::internal::MergeOptions(std::move(opts), fallback); +} + + +google::cloud::storage::Client MakeGrpcClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id)); + return google::cloud::storage::MakeGrpcClient(std::move(options)); +} + +google::cloud::storage::AsyncClient MakeAsyncClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id) + .set({"rpc"}) + .set(true)); + return google::cloud::storage::AsyncClient(options); +} + +class ThreadPool { +public: + // Constructor initializes the thread pool with a given number of worker threads. + ThreadPool(size_t threads) : stop_(false) { + if (threads == 0) { + throw std::invalid_argument("Thread count cannot be zero."); + } + for (size_t i = 0; i < threads; ++i) { + workers_.emplace_back([this] { + while (true) { + std::function task; + { + // Acquire a lock on the task queue. + std::unique_lock lock(this->queue_mutex_); + + // Wait for a task to be available or for the pool to stop. + this->condition_.wait(lock, [this] { + return this->stop_ || !this->tasks_.empty(); + }); + + // If the pool is stopping and no tasks are left, exit the thread. + if (this->stop_ && this->tasks_.empty()) { + return; + } + + // Get the next task from the queue. + task = std::move(this->tasks_.front()); + this->tasks_.pop(); + } + + // Execute the task. + task(); + } + }); + } + } + + // Adds a new task to the thread pool. + template + auto enqueue(F&& f, Args&&... args) -> std::future::type> { + using return_type = typename std::result_of::type; + + // Create a packaged_task to wrap the function and its arguments. + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + // Acquire a lock on the queue and push the task. + std::unique_lock lock(queue_mutex_); + + // Don't allow enqueueing after stopping the pool. + if (stop_) { + throw std::runtime_error("enqueue on stopped ThreadPool"); + } + + tasks_.emplace([task]() { (*task)(); }); + } + + // Notify one waiting thread that a new task is available. + condition_.notify_one(); + return res; + } + + // Destructor stops all worker threads and joins them. + ~ThreadPool() { + { + std::unique_lock lock(queue_mutex_); + stop_ = true; + } + + // Notify all threads so they can wake up and exit their loops. + condition_.notify_all(); + + for (std::thread& worker : workers_) { + worker.join(); + } + } + +private: + std::vector workers_; + std::queue> tasks_; + + std::mutex queue_mutex_; + std::condition_variable condition_; + + bool stop_; +}; - google::storage::v2::RewriteObjectRequest resume_request; - resume_request.set_source_bucket(source->bucket()); - resume_request.set_source_object(source->name()); - resume_request.set_destination_bucket(BucketName(*destination).FullName()); - resume_request.set_destination_name(expected_name); - resume_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.ResumeRewrite(std::move(resume_request)); - google::storage::v2::Object metadata; - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); +void ReadRangeTask(std::shared_ptr descriptor, + std::int64_t& offset, + std::int64_t& limit) { + AsyncReader r; AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_EQ(metadata.bucket(), BucketName(*destination).FullName()); - EXPECT_EQ(metadata.name(), expected_name); - EXPECT_EQ(metadata.size(), source->size()); - EXPECT_FALSE(token.valid()); - } -} - -TEST_F(AsyncClientIntegrationTest, InsertFailure) { - auto async = AsyncClient(TestOptions()); - - auto insert = async - .InsertObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), LoremIpsum()) - .get(); - ASSERT_THAT(insert, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadFailure) { - auto async = AsyncClient(TestOptions()); - - auto read = async - .ReadObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - // At the moment, only connectivity errors are detected before the first - // `Read()` call. Accept such failures too: - if (!read) return; - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(read); - auto payload = ReadAll(std::move(reader), std::move(token)).get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadRangeFailure) { - auto async = AsyncClient(TestOptions()); - - auto payload = - async - .ReadObjectRange(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), /*offset=*/0, /*limit=*/1) - .get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartBufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async.ResumeBufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartUnbufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = - async.ResumeUnbufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto make_source = [](std::string name) { - auto source = google::storage::v2::ComposeObjectRequest::SourceObject{}; - source.set_name(std::move(name)); - return source; - }; - auto composed = - async - .ComposeObject(BucketName(bucket_name()), MakeRandomObjectName(), - {make_source(MakeRandomObjectName()), - make_source(MakeRandomObjectName())}) - .get(); - ASSERT_THAT(composed, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, DeleteObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto deleted = - async.DeleteObject(BucketName(bucket_name()), MakeRandomObjectName()) - .get(); - ASSERT_THAT(deleted, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.StartRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName()); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.ResumeRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName(), - "test-only-invalid-rewrite-token"); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadEmpty) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadMultiple) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; + + // 1. Get the reader and token for the specified range + // MODIFIED: Call Read() directly on the descriptor object + std::tie(r, t) = descriptor->Read(offset, limit); + + // 2. Consume the entire stream for this range + while (t.valid()) { + auto read = r.Read(std::move(t)).get(); + + // ASSERT_STATUS_OK will flag the test as failed and abort this + // thread if the status is not OK. + ASSERT_STATUS_OK(read); + + ReadPayload p; + AsyncToken t_new; + std::tie(p, t_new) = *std::move(read); + t = std::move(t_new); + + // In this test, we are discarding the payload `p`, just as + // the original single-threaded loop did. + } +} + +/* +TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { + auto project_id = "bajajnehaa-devrel-test"; + // auto const kproject = google::cloud::Project(project_id); + + auto client = MakeGrpcClient(project_id); + + auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; + auto object_name = "vaibhav-test-file-111"; + // auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; + // auto hns = gcs::BucketHierarchicalNamespace{true}; + // auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; + + auto constexpr kBlockSize = 1024; + auto constexpr kBlockCount = 100000; auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); + auto const block2 = MakeRandomData(kBlockSize); + + auto async = MakeAsyncClient(project_id); + // auto w = async.StartBufferedUpload(BucketName(bucket_name), object_name) + // .get(); + // ASSERT_STATUS_OK(w); + auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) + .get(); ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, ResumeAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); AsyncWriter writer; AsyncToken token; std::tie(writer, token) = *std::move(w); - - for (int i = 0; i != kInitialBlockCount - 1; ++i) { + for (int i = 0; i < kBlockCount; ++i) { + std::cout << "Writing data iteration #" << i << std::endl; auto p = writer.Write(std::move(token), WritePayload(block)).get(); ASSERT_STATUS_OK(p); token = *std::move(p); } + +// auto metadata1 = writer.Finalize(std::move(token)).get(); +// ASSERT_STATUS_OK(metadata1); +// std::cout << "Request metadata: " << metadata1->generation() << std::endl; + // EXPECT_EQ(1,2); + + auto close = writer.Close(); + + // auto object_metadata = client.GetObjectMetadata(bucket_name, object_name); + // auto m = *object_metadata; + // auto generation = m.generation(); + + // auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, generation) + // .get(); + + // ASSERT_STATUS_OK(w1); + + // AsyncWriter writer1; + // AsyncToken token1; + // std::tie(writer1, token1) = *std::move(w1); + + // for (int i = 0; i < kBlockCount; ++i) { + // // std::cout << "Writing data iteration #" << i << std::endl; + // auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); + // ASSERT_STATUS_OK(p); + // token1 = *std::move(p); + // } + + // // auto object_metadata1 = client.GetObjectMetadata(bucket_name, object_name); + // // auto m1 = *object_metadata1; + // // // auto generation1 = m1.generation(); + // // std::cout << "Object metadata1: " << m << std::endl; + + // auto metadata = writer1.Finalize(std::move(token1)).get(); + // ASSERT_STATUS_OK(metadata); + // // // ScheduleForDelete(*metadata); + + // EXPECT_EQ(metadata->bucket(), BucketName(bucket_name).FullName()); + // EXPECT_EQ(metadata->name()," object_name"); + // EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); + // EXPECT_EQ("dddd", "Sdfs"); + // std::cout << "Test completed successfully" << std::endl; + // client.DeleteObject(bucket_name, object_name); + +// auto spec = google::storage::v2::BidiReadObjectSpec{}; +// // std::cout << object_metadata->bucket() << "\n"; + +// // spec.set_bucket("projects/_/buckets/gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"); +// // spec.set_object(object_name); +// // auto descriptor_status = async.Open(spec).get(); +// // ASSERT_STATUS_OK(descriptor_status); +// // ObjectDescriptor descriptor = *std::move(descriptor_status); +// // auto descriptor_ptr = +// // std::make_shared(std::move(descriptor)); +// // std::shared_ptr descriptor_ptr = std::make_shared(*descriptor); + +// // --- Start of ThreadPool implementation --- + +// // 1. Initialize the ThreadPool +// // Use hardware_concurrency to get a reasonable number of threads +// size_t num_threads = std::thread::hardware_concurrency(); +// std::cout << "Starting ThreadPool with " << num_threads << " threads." << std::endl; +// ThreadPool pool(num_threads); + +// // 2. Define read parameters and storage for futures +// std::vector> futures; +// int num_reads = 1000; +// std::int64_t read_offset = 0; +// std::int64_t read_limit = 1024 * 1024 * 1024; // 1 GiB + +// std::cout << "Enqueuing " << num_reads << " read tasks..." << std::endl; + +// // 3. Enqueue all the read tasks +// // The original loop is replaced with this loop. +// for (int i = 0; i < num_reads; ++i) { +// // Pass *descriptor (the ObjectDescriptor) by value. +// // This is safe because it's a copyable wrapper. +// futures.push_back( +// pool.enqueue(ReadRangeTask, descriptor_ptr, read_offset, read_limit) +// ); +// } + +// // 4. Wait for all enqueued tasks to complete +// std::cout << "Waiting for all " << futures.size() << " read tasks to complete..." << std::endl; +// for (auto& f : futures) { +// f.get(); // This blocks until the future is ready. +// // If a task failed (e.g., via ASSERT_STATUS_OK), +// // gtest will have already flagged the failure. +// // If the task threw an exception, get() will re-throw it. +// } + +// std::cout << "All " << num_reads << " parallel read tasks completed." << std::endl; + + // --- End of ThreadPool implementation --- + + // auto actual0 = std::string{}; + // for(int i =0 ; i< 1000 ; i++){ + // std::tie(r0, t0) = descriptor->Read(0 , 1024* 1024* 1024); + // actual0 = std::string{}; + // while (t0.valid()) { + // auto read = r0.Read(std::move(t0)).get(); + // ASSERT_STATUS_OK(read); + // ReadPayload p; + // AsyncToken t; + // std::tie(p, t) = *std::move(read); + // t0 = std::move(t); + // } + // } + +// auto ans = block + block + block; + EXPECT_EQ(1,2); +} +*/ + + +TEST_F(AsyncClientIntegrationTest, ComposeObjectSimple) { + auto project_id = GetEnv("GOOGLE_CLOUD_PROJECT").value_or(""); + ASSERT_FALSE(project_id.empty()); + auto bucket_name = GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); + ASSERT_FALSE(bucket_name.empty()); + + // Use the standard sync client to hit the real GCS endpoint for ComposeObject. + auto client = MakeGrpcClient(project_id); + auto source_name_1 = MakeRandomObjectName(); + auto source_name_2 = MakeRandomObjectName(); + auto composed_name = MakeRandomObjectName(); + + auto const block1 = MakeRandomData(1024); + auto const block2 = MakeRandomData(1024); + + auto insert1 = client.InsertObject(bucket_name, source_name_1, block1); + ASSERT_STATUS_OK(insert1); + ScheduleForDelete(*insert1); + + auto insert2 = client.InsertObject(bucket_name, source_name_2, block2); + ASSERT_STATUS_OK(insert2); + ScheduleForDelete(*insert2); + + std::vector sources = {{source_name_1, {}, {}}, + {source_name_2, {}, {}}}; + + auto composed = client.ComposeObject(bucket_name, sources, composed_name); + ASSERT_STATUS_OK(composed); + ScheduleForDelete(*composed); - writer.Close(); - - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - auto const persisted = writer.PersistedState(); - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); - } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - - // Explicitly flush the data. - auto flush_status = writer.Flush().get(); - EXPECT_STATUS_OK(flush_status); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, Open) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - - auto constexpr kSize = 8 * 1024; - auto constexpr kStride = 2 * kSize; - auto constexpr kBlockCount = 4; - auto const block = MakeRandomData(kSize); - - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - - auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); - - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); - } - - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); -} - -TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = - AsyncClient(TestOptions().set(1024)); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - - auto constexpr kSize = 2048; - auto const block = MakeRandomData(kSize); - - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - - auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); - - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); - } - - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); + EXPECT_EQ(composed->size(), block1.size() + block2.size()); } } // namespace @@ -1065,4 +417,4 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC +#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC \ No newline at end of file diff --git a/google/cloud/storage/tests/object_rewrite_integration_test.cc b/google/cloud/storage/tests/object_rewrite_integration_test.cc index ce4d49c26eac1..4519ebab467fb 100644 --- a/google/cloud/storage/tests/object_rewrite_integration_test.cc +++ b/google/cloud/storage/tests/object_rewrite_integration_test.cc @@ -293,6 +293,82 @@ TEST_F(ObjectRewriteIntegrationTest, ComposeSimple) { EXPECT_EQ(meta->size() * 2, composed_meta->size()); } +TEST_F(ObjectRewriteIntegrationTest, ComposeDeleteSourceObjectsFalse) { + auto client = MakeIntegrationTestClient(); + + auto object_name1 = MakeRandomObjectName(); + StatusOr meta1 = client.InsertObject( + bucket_name_, object_name1, LoremIpsum(), IfGenerationMatch(0)); + ASSERT_STATUS_OK(meta1); + ScheduleForDelete(*meta1); + + auto object_name2 = MakeRandomObjectName(); + StatusOr meta2 = client.InsertObject( + bucket_name_, object_name2, LoremIpsum(), IfGenerationMatch(0)); + ASSERT_STATUS_OK(meta2); + ScheduleForDelete(*meta2); + + auto composed_object_name = MakeRandomObjectName(); + std::vector source_objects = {{object_name1, {}, {}}, + {object_name2, {}, {}}}; + + StatusOr composed_meta = client.ComposeObject( + bucket_name_, source_objects, composed_object_name, + WithObjectMetadata(ObjectMetadata().set_content_type("plain/text")), + DeleteSourceObjects(false)); + ASSERT_STATUS_OK(composed_meta); + ScheduleForDelete(*composed_meta); + + EXPECT_EQ(meta1->size() + meta2->size(), composed_meta->size()); + + auto check1 = client.GetObjectMetadata(bucket_name_, object_name1); + EXPECT_STATUS_OK(check1) << "Source object 1 (" << object_name1 + << ") should NOT have been deleted."; + + auto check2 = client.GetObjectMetadata(bucket_name_, object_name2); + EXPECT_STATUS_OK(check2) << "Source object 2 (" << object_name2 + << ") should NOT have been deleted."; +} + +TEST_F(ObjectRewriteIntegrationTest, ComposeDeleteSourceObjectsTrue) { + auto client = MakeIntegrationTestClient(); + + auto object_name1 = MakeRandomObjectName(); + StatusOr meta1 = client.InsertObject( + bucket_name_, object_name1, LoremIpsum(), IfGenerationMatch(0)); + ASSERT_STATUS_OK(meta1); + ScheduleForDelete(*meta1); + + auto object_name2 = MakeRandomObjectName(); + StatusOr meta2 = client.InsertObject( + bucket_name_, object_name2, LoremIpsum(), IfGenerationMatch(0)); + ASSERT_STATUS_OK(meta2); + ScheduleForDelete(*meta2); + + auto composed_object_name = MakeRandomObjectName(); + std::vector source_objects = {{object_name1, {}, {}}, + {object_name2, {}, {}}}; + + StatusOr composed_meta = client.ComposeObject( + bucket_name_, source_objects, composed_object_name, + WithObjectMetadata(ObjectMetadata().set_content_type("plain/text")), + DeleteSourceObjects(true)); + ASSERT_STATUS_OK(composed_meta); + ScheduleForDelete(*composed_meta); + + EXPECT_EQ(meta1->size() + meta2->size(), composed_meta->size()); + + auto check1 = client.GetObjectMetadata(bucket_name_, object_name1); + EXPECT_FALSE(check1.ok()); + EXPECT_EQ(StatusCode::kNotFound, check1.status().code()) + << "Source object 1 (" << object_name1 << ") should have been deleted."; + + auto check2 = client.GetObjectMetadata(bucket_name_, object_name2); + EXPECT_FALSE(check2.ok()); + EXPECT_EQ(StatusCode::kNotFound, check2.status().code()) + << "Source object 2 (" << object_name2 << ") should have been deleted."; +} + TEST_F(ObjectRewriteIntegrationTest, ComposedUsingEncryptedObject) { // TODO(#14385) - the emulator does not support this feature for gRPC. if (UsingEmulator() && UsingGrpc()) GTEST_SKIP(); diff --git a/google/cloud/storage/well_known_parameters.h b/google/cloud/storage/well_known_parameters.h index b924322ea0c14..5493f4f4874eb 100644 --- a/google/cloud/storage/well_known_parameters.h +++ b/google/cloud/storage/well_known_parameters.h @@ -605,6 +605,14 @@ struct ReturnPartialSuccess } }; +struct DeleteSourceObjects + : public internal::WellKnownParameter { + using WellKnownParameter::WellKnownParameter; + static char const* well_known_parameter_name() { + return "deleteSourceObjects"; + } +}; + GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage } // namespace cloud From a69a725c4f0b07ba787cedaf3200c471b4f51ec2 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 13 May 2026 08:31:26 +0000 Subject: [PATCH 2/3] add implementation of delete object source --- .bazelignore | 1 - .../internal/grpc/object_request_parser.cc | 3 + .../cloud/storage/internal/object_requests.cc | 3 + .../tests/async_client_integration_test.cc | 1300 ++++++++++++----- 4 files changed, 980 insertions(+), 327 deletions(-) diff --git a/.bazelignore b/.bazelignore index 67a49d97379c7..98112232070b2 100644 --- a/.bazelignore +++ b/.bazelignore @@ -5,4 +5,3 @@ cmake-build-debug/ cmake-build-coverage/ cmake-build-release/ .build/ -build_/ diff --git a/google/cloud/storage/internal/grpc/object_request_parser.cc b/google/cloud/storage/internal/grpc/object_request_parser.cc index 5c6e1027a2d72..4ef0ab303f973 100644 --- a/google/cloud/storage/internal/grpc/object_request_parser.cc +++ b/google/cloud/storage/internal/grpc/object_request_parser.cc @@ -371,6 +371,9 @@ StatusOr ToProto( request.GetOption().value()); } result.set_kms_key(request.GetOption().value_or("")); + if (request.GetOption().value_or(false)) { + result.set_delete_source_objects(true); + } return result; } diff --git a/google/cloud/storage/internal/object_requests.cc b/google/cloud/storage/internal/object_requests.cc index 39d2c968cf271..07d5a0c95c40a 100644 --- a/google/cloud/storage/internal/object_requests.cc +++ b/google/cloud/storage/internal/object_requests.cc @@ -379,6 +379,9 @@ std::string ComposeObjectRequest::JsonPayload() const { source_object_list.emplace_back(std::move(source_object_json)); } compose_object_payload_json["sourceObjects"] = source_object_list; + if (HasOption() && GetOption().value()) { + compose_object_payload_json["deleteSourceObjects"] = true; + } return compose_object_payload_json.dump(); } diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index f2a52e456927d..8bfccb3f2db16 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,8 +17,8 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" -#include "google/cloud/opentelemetry_options.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/grpc_options.h" @@ -32,8 +32,6 @@ #include #include #include -#include -#include #include namespace google { @@ -54,7 +52,15 @@ using ::testing::VariantWith; class AsyncClientIntegrationTest : public google::cloud::storage::testing::StorageIntegrationTest { -protected: + protected: + void SetUp() override { + bucket_name_ = + GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); + ASSERT_THAT(bucket_name_, Not(IsEmpty())) + << "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set"; + } + + std::string const& bucket_name() const { return bucket_name_; } using google::cloud::storage::testing::StorageIntegrationTest:: ScheduleForDelete; @@ -65,350 +71,992 @@ class AsyncClientIntegrationTest .set_generation(object.generation())); } -private: + private: std::string bucket_name_; }; -namespace gcs = ::google::cloud::storage; +auto TestOptions() { + // Disable metrics in the test, they just make the logs harder to grok. + return Options{} + .set(false) + .set(1) + .set(TracingOptions().SetOptions( + "truncate_string_field_longer_than=2048")); +} + +auto AlwaysRetry() { + return TestOptions().set( + MakeAlwaysRetryAsyncIdempotencyPolicy); +} -// auto AlwaysRetry() { -// return google::cloud::Options{}.set( -// MakeAlwaysRetryIdempotencyPolicy); -// } +TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); -google::cloud::Options MakeOptions(google::cloud::Options opts) { - auto fallback = google::cloud::Options{}; - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_GRPC_ENDPOINT")) { - fallback.set(*v); + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + LoremIpsum(), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); + auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); + + for (auto* p : {&full1, &full0}) { + auto response = p->get(); + ASSERT_STATUS_OK(response); + auto contents = response->contents(); + auto const full = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(full, LoremIpsum()); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_JSON_ENDPOINT")) { - fallback.set(*v); + for (auto* p : {&partial1, &partial0}) { + auto response = p->get(); + ASSERT_STATUS_OK(response); + auto contents = response->contents(); + auto const partial = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(partial, LoremIpsum().substr(2)); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_AUTHORITY")) { - fallback.set(*v); + + auto status = async + .DeleteObject(BucketName(bucket_name()), object_name, + insert->generation()) + .get(); + EXPECT_STATUS_OK(status); + + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(insert->bucket()); + request.set_object(insert->name()); + request.set_generation(insert->generation()); + auto head = async.ReadObjectRange(request, /*offset=*/0, /*limit=*/1).get(); + EXPECT_THAT(head, StatusIs(StatusCode::kNotFound)); +} + +TEST_F(AsyncClientIntegrationTest, ComposeObject) { + auto async = AsyncClient(TestOptions()); + auto o1 = MakeRandomObjectName(); + auto o2 = MakeRandomObjectName(); + auto destination = MakeRandomObjectName(); + + auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), + AlwaysRetry()); + auto insert2 = async.InsertObject(BucketName(bucket_name()), o2, LoremIpsum(), + AlwaysRetry()); + std::vector> inserted{insert1.get(), + insert2.get()}; + for (auto const& insert : inserted) { + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_TARGET_API_VERSION")) { - fallback.set(*v); + std::vector sources; + std::transform(inserted.begin(), inserted.end(), std::back_inserter(sources), + [](auto const& o) { + google::storage::v2::ComposeObjectRequest::SourceObject r; + r.set_name(o->name()); + r.set_generation(o->generation()); + return r; + }); + auto pending = async.ComposeObject(BucketName(bucket_name()), destination, + std::move(sources)); + auto const composed = pending.get(); + EXPECT_STATUS_OK(composed); + ScheduleForDelete(*composed); + + auto read = async + .ReadObjectRange(BucketName(bucket_name()), destination, 0, + 2 * LoremIpsum().size()) + .get(); + ASSERT_STATUS_OK(read); + auto contents = read->contents(); + auto const full_contents = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(full_contents, LoremIpsum() + LoremIpsum()); + EXPECT_THAT(read->metadata(), Optional(IsProtoEqual(*composed))); +} + +TEST_F(AsyncClientIntegrationTest, StreamingRead) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a relatively large object so the streaming read makes sense. We + // aim for something around 5MiB, enough for 3 `Read()` calls. + auto constexpr kLineSize = 64; + auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; + auto const block = MakeRandomData(kLineSize); + std::vector insert_data(kLineCount); + std::generate(insert_data.begin(), insert_data.end(), [&, n = 0]() mutable { + return std::to_string(++n) + ": " + block; + }); + auto const expected_size = std::accumulate( + insert_data.begin(), insert_data.end(), static_cast(0), + [](auto a, auto const& b) { return a + b.size(); }); + + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + insert_data, AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + ASSERT_EQ(insert->size(), expected_size); + + auto r = async.ReadObject(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(r); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(r); + + std::string actual; + while (token.valid()) { + auto p = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(p); + ReadPayload payload; + std::tie(payload, token) = *std::move(p); + for (auto v : payload.contents()) actual += std::string(v); } - fallback.set(false); - return google::cloud::internal::MergeOptions(std::move(opts), fallback); -} - - -google::cloud::storage::Client MakeGrpcClient(std::string project_id) { - auto options = MakeOptions(google::cloud::Options{} - .set(project_id)); - return google::cloud::storage::MakeGrpcClient(std::move(options)); -} - -google::cloud::storage::AsyncClient MakeAsyncClient(std::string project_id) { - auto options = MakeOptions(google::cloud::Options{} - .set(project_id) - .set({"rpc"}) - .set(true)); - return google::cloud::storage::AsyncClient(options); -} - -class ThreadPool { -public: - // Constructor initializes the thread pool with a given number of worker threads. - ThreadPool(size_t threads) : stop_(false) { - if (threads == 0) { - throw std::invalid_argument("Thread count cannot be zero."); - } - for (size_t i = 0; i < threads; ++i) { - workers_.emplace_back([this] { - while (true) { - std::function task; - { - // Acquire a lock on the task queue. - std::unique_lock lock(this->queue_mutex_); - - // Wait for a task to be available or for the pool to stop. - this->condition_.wait(lock, [this] { - return this->stop_ || !this->tasks_.empty(); - }); - - // If the pool is stopping and no tasks are left, exit the thread. - if (this->stop_ && this->tasks_.empty()) { - return; - } - - // Get the next task from the queue. - task = std::move(this->tasks_.front()); - this->tasks_.pop(); - } - - // Execute the task. - task(); - } - }); - } - } - - // Adds a new task to the thread pool. - template - auto enqueue(F&& f, Args&&... args) -> std::future::type> { - using return_type = typename std::result_of::type; - - // Create a packaged_task to wrap the function and its arguments. - auto task = std::make_shared>( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - // Acquire a lock on the queue and push the task. - std::unique_lock lock(queue_mutex_); - - // Don't allow enqueueing after stopping the pool. - if (stop_) { - throw std::runtime_error("enqueue on stopped ThreadPool"); - } - - tasks_.emplace([task]() { (*task)(); }); - } - - // Notify one waiting thread that a new task is available. - condition_.notify_one(); - return res; - } - - // Destructor stops all worker threads and joins them. - ~ThreadPool() { - { - std::unique_lock lock(queue_mutex_); - stop_ = true; - } - - // Notify all threads so they can wake up and exit their loops. - condition_.notify_all(); - - for (std::thread& worker : workers_) { - worker.join(); - } - } - -private: - std::vector workers_; - std::queue> tasks_; - - std::mutex queue_mutex_; - std::condition_variable condition_; - - bool stop_; -}; + EXPECT_EQ(actual.size(), expected_size); + auto view = absl::string_view(actual); + for (auto const& expected : insert_data) { + ASSERT_GE(view.size(), expected.size()); + ASSERT_EQ(expected, view.substr(0, expected.size())); + view.remove_prefix(expected.size()); + } + EXPECT_EQ(view, absl::string_view{}); +} + +TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a relatively large object so the streaming read makes sense. We + // aim for something around 5MiB, enough for 3 `Read()` calls. + auto constexpr kLineSize = 64; + auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; + auto constexpr kReadOffset = kLineCount * kLineSize / 2; + auto const block = MakeRandomData(kLineSize - 1) + "\n"; + std::string contents; + for (int i = 0; i != kLineCount; ++i) contents += block; + auto const expected_insert_size = contents.size(); + + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + contents, AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + ASSERT_EQ(insert->size(), expected_insert_size); + + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(insert->bucket()); + request.set_object(insert->name()); + request.set_generation(insert->generation()); + request.set_read_offset(kReadOffset); + auto r = async.ReadObject(request).get(); + ASSERT_STATUS_OK(r); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(r); + + std::string actual; + while (token.valid()) { + auto p = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(p); + ReadPayload payload; + std::tie(payload, token) = *std::move(p); + for (auto v : payload.contents()) actual += std::string(v); + } + + EXPECT_EQ(absl::string_view(actual), + absl::string_view(contents).substr(kReadOffset)); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kInitialBlockCount = 4; + auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; + auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto const upload_id = writer.UploadId(); + for (int i = 0; i != kInitialBlockCount - 1; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + // Reset the existing writer and resume the upload. + writer = AsyncWriter(); + w = client.ResumeUnbufferedUpload(upload_id).get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + ASSERT_EQ(writer.UploadId(), upload_id); + auto const persisted = writer.PersistedState(); + // We don't expect this to be larger that the total size of the object. + // Incidentally, this shows the value fits into an `int`. + ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); + // Cast to `int` because otherwise we need to write multiple casts below. + auto offset = static_cast(absl::get(persisted)); + if (offset % kBlockSize != 0) { + auto s = block.substr(offset % kBlockSize); + auto const size = s.size(); + auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); + ASSERT_STATUS_OK(p); + offset += static_cast(size); + token = *std::move(p); + } + while (offset < kDesiredSize) { + auto const n = std::min(kBlockSize, kDesiredSize - offset); + auto p = + writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); + ASSERT_STATUS_OK(p); + offset += n; + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kDesiredSize); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); -void ReadRangeTask(std::shared_ptr descriptor, - std::int64_t& offset, - std::int64_t& limit) { - AsyncReader r; + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto const upload_id = writer.UploadId(); + auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); + + w = client.ResumeUnbufferedUpload(upload_id).get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + EXPECT_FALSE(token.valid()); + EXPECT_THAT(writer.PersistedState(), VariantWith( + IsProtoEqual(*metadata))); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto w = + client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; + auto const block = MakeRandomData(kBlockSize); + + auto w = + client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, RewriteObject) { + auto async = AsyncClient(TestOptions()); + auto o1 = MakeRandomObjectName(); + auto o2 = MakeRandomObjectName(); + + auto constexpr kBlockSize = 4 * 1024 * 1024; + auto insert = async + .InsertObject(BucketName(bucket_name()), o1, + MakeRandomData(kBlockSize), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + // Start a rewrite, but limit each iteration to a small number of bytes, to + // force multiple calls. + google::storage::v2::Object metadata; + AsyncRewriter rewriter; + AsyncToken token; + google::storage::v2::RewriteObjectRequest request; + request.set_destination_name(o2); + request.set_destination_bucket(BucketName(bucket_name()).FullName()); + request.set_source_object(o1); + request.set_source_bucket(BucketName(bucket_name()).FullName()); + request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.StartRewrite(std::move(request)); + while (token.valid()) { + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); + google::storage::v2::RewriteResponse response; + AsyncToken t; + std::tie(response, t) = *std::move(rt); + token = std::move(t); + if (!response.has_resource()) continue; + metadata = response.resource(); + ScheduleForDelete(metadata); + EXPECT_FALSE(token.valid()); + } + EXPECT_EQ(metadata.name(), o2); + EXPECT_EQ(metadata.size(), insert->size()); +} + +TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { + auto async = AsyncClient(TestOptions()); + auto destination = + GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); + if (!destination || destination->empty()) GTEST_SKIP(); + + auto constexpr kBlockSize = 4 * 1024 * 1024; + auto source = + async + .InsertObject(BucketName(bucket_name()), MakeRandomObjectName(), + MakeRandomData(kBlockSize), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(source); + ScheduleForDelete(*source); + + // Start a rewrite, but limit each iteration to a small number of bytes, to + // force multiple calls. + AsyncRewriter rewriter; + AsyncToken token; + auto const expected_name = MakeRandomObjectName(); + google::storage::v2::RewriteObjectRequest start_request; + start_request.set_destination_name(expected_name); + start_request.set_destination_bucket(BucketName(*destination).FullName()); + start_request.set_source_object(source->name()); + start_request.set_source_bucket(source->bucket()); + start_request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.StartRewrite(start_request); + + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); + google::storage::v2::RewriteResponse response; + AsyncToken t; + std::tie(response, t) = *std::move(rt); + + // We want to resume a partially completed resume. Verify the first rewrite + // did not complete things. + ASSERT_THAT(response.rewrite_token(), Not(IsEmpty())); + + google::storage::v2::RewriteObjectRequest resume_request; + resume_request.set_source_bucket(source->bucket()); + resume_request.set_source_object(source->name()); + resume_request.set_destination_bucket(BucketName(*destination).FullName()); + resume_request.set_destination_name(expected_name); + resume_request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.ResumeRewrite(std::move(resume_request)); + + google::storage::v2::Object metadata; + while (token.valid()) { + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); AsyncToken t; - - // 1. Get the reader and token for the specified range - // MODIFIED: Call Read() directly on the descriptor object - std::tie(r, t) = descriptor->Read(offset, limit); - - // 2. Consume the entire stream for this range - while (t.valid()) { - auto read = r.Read(std::move(t)).get(); - - // ASSERT_STATUS_OK will flag the test as failed and abort this - // thread if the status is not OK. - ASSERT_STATUS_OK(read); - - ReadPayload p; - AsyncToken t_new; - std::tie(p, t_new) = *std::move(read); - t = std::move(t_new); - - // In this test, we are discarding the payload `p`, just as - // the original single-threaded loop did. - } -} - -/* -TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { - auto project_id = "bajajnehaa-devrel-test"; - // auto const kproject = google::cloud::Project(project_id); - - auto client = MakeGrpcClient(project_id); - - auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; - auto object_name = "vaibhav-test-file-111"; - // auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; - // auto hns = gcs::BucketHierarchicalNamespace{true}; - // auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; - - auto constexpr kBlockSize = 1024; - auto constexpr kBlockCount = 100000; + std::tie(response, t) = *std::move(rt); + token = std::move(t); + if (!response.has_resource()) continue; + metadata = response.resource(); + ScheduleForDelete(metadata); + EXPECT_EQ(metadata.bucket(), BucketName(*destination).FullName()); + EXPECT_EQ(metadata.name(), expected_name); + EXPECT_EQ(metadata.size(), source->size()); + EXPECT_FALSE(token.valid()); + } +} + +TEST_F(AsyncClientIntegrationTest, InsertFailure) { + auto async = AsyncClient(TestOptions()); + + auto insert = async + .InsertObject(BucketName(MakeRandomBucketName()), + MakeRandomObjectName(), LoremIpsum()) + .get(); + ASSERT_THAT(insert, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ReadFailure) { + auto async = AsyncClient(TestOptions()); + + auto read = async + .ReadObject(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + // At the moment, only connectivity errors are detected before the first + // `Read()` call. Accept such failures too: + if (!read) return; + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(read); + auto payload = ReadAll(std::move(reader), std::move(token)).get(); + ASSERT_THAT(payload, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ReadRangeFailure) { + auto async = AsyncClient(TestOptions()); + + auto payload = + async + .ReadObjectRange(BucketName(MakeRandomBucketName()), + MakeRandomObjectName(), /*offset=*/0, /*limit=*/1) + .get(); + ASSERT_THAT(payload, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async + .StartBufferedUpload(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeBufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async.ResumeBufferedUpload("test-only-invalid-upload-id").get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async + .StartUnbufferedUpload(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeUnbufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = + async.ResumeUnbufferedUpload("test-only-invalid-upload-id").get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ComposeObjectFailure) { + auto async = AsyncClient(TestOptions()); + + auto make_source = [](std::string name) { + auto source = google::storage::v2::ComposeObjectRequest::SourceObject{}; + source.set_name(std::move(name)); + return source; + }; + auto composed = + async + .ComposeObject(BucketName(bucket_name()), MakeRandomObjectName(), + {make_source(MakeRandomObjectName()), + make_source(MakeRandomObjectName())}) + .get(); + ASSERT_THAT(composed, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, DeleteObjectFailure) { + auto async = AsyncClient(TestOptions()); + + auto deleted = + async.DeleteObject(BucketName(bucket_name()), MakeRandomObjectName()) + .get(); + ASSERT_THAT(deleted, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartRewriteFailure) { + auto async = AsyncClient(TestOptions()); + + AsyncRewriter rewriter; + AsyncToken token; + std::tie(rewriter, token) = + async.StartRewrite(BucketName(bucket_name()), MakeRandomObjectName(), + BucketName(bucket_name()), MakeRandomObjectName()); + ASSERT_TRUE(token.valid()); + auto iteration = rewriter.Iterate(std::move(token)).get(); + ASSERT_THAT(iteration, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeRewriteFailure) { + auto async = AsyncClient(TestOptions()); + + AsyncRewriter rewriter; + AsyncToken token; + std::tie(rewriter, token) = + async.ResumeRewrite(BucketName(bucket_name()), MakeRandomObjectName(), + BucketName(bucket_name()), MakeRandomObjectName(), + "test-only-invalid-rewrite-token"); + ASSERT_TRUE(token.valid()); + auto iteration = rewriter.Iterate(std::move(token)).get(); + ASSERT_THAT(iteration, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadEmpty) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadMultiple) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; auto const block = MakeRandomData(kBlockSize); - auto const block2 = MakeRandomData(kBlockSize); - - auto async = MakeAsyncClient(project_id); - // auto w = async.StartBufferedUpload(BucketName(bucket_name), object_name) - // .get(); - // ASSERT_STATUS_OK(w); - auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) - .get(); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, ResumeAppendableObjectUpload) { + // Skipping the test till we get the takeover feature on testbench. + GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kInitialBlockCount = 4; + auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; + auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); AsyncWriter writer; AsyncToken token; std::tie(writer, token) = *std::move(w); - for (int i = 0; i < kBlockCount; ++i) { - std::cout << "Writing data iteration #" << i << std::endl; + + for (int i = 0; i != kInitialBlockCount - 1; ++i) { auto p = writer.Write(std::move(token), WritePayload(block)).get(); ASSERT_STATUS_OK(p); token = *std::move(p); } - -// auto metadata1 = writer.Finalize(std::move(token)).get(); -// ASSERT_STATUS_OK(metadata1); -// std::cout << "Request metadata: " << metadata1->generation() << std::endl; - // EXPECT_EQ(1,2); - - auto close = writer.Close(); - - // auto object_metadata = client.GetObjectMetadata(bucket_name, object_name); - // auto m = *object_metadata; - // auto generation = m.generation(); - - // auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, generation) - // .get(); - - // ASSERT_STATUS_OK(w1); - - // AsyncWriter writer1; - // AsyncToken token1; - // std::tie(writer1, token1) = *std::move(w1); - - // for (int i = 0; i < kBlockCount; ++i) { - // // std::cout << "Writing data iteration #" << i << std::endl; - // auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); - // ASSERT_STATUS_OK(p); - // token1 = *std::move(p); - // } - - // // auto object_metadata1 = client.GetObjectMetadata(bucket_name, object_name); - // // auto m1 = *object_metadata1; - // // // auto generation1 = m1.generation(); - // // std::cout << "Object metadata1: " << m << std::endl; - - // auto metadata = writer1.Finalize(std::move(token1)).get(); - // ASSERT_STATUS_OK(metadata); - // // // ScheduleForDelete(*metadata); - - // EXPECT_EQ(metadata->bucket(), BucketName(bucket_name).FullName()); - // EXPECT_EQ(metadata->name()," object_name"); - // EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); - // EXPECT_EQ("dddd", "Sdfs"); - // std::cout << "Test completed successfully" << std::endl; - // client.DeleteObject(bucket_name, object_name); - -// auto spec = google::storage::v2::BidiReadObjectSpec{}; -// // std::cout << object_metadata->bucket() << "\n"; - -// // spec.set_bucket("projects/_/buckets/gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"); -// // spec.set_object(object_name); -// // auto descriptor_status = async.Open(spec).get(); -// // ASSERT_STATUS_OK(descriptor_status); -// // ObjectDescriptor descriptor = *std::move(descriptor_status); -// // auto descriptor_ptr = -// // std::make_shared(std::move(descriptor)); -// // std::shared_ptr descriptor_ptr = std::make_shared(*descriptor); - -// // --- Start of ThreadPool implementation --- - -// // 1. Initialize the ThreadPool -// // Use hardware_concurrency to get a reasonable number of threads -// size_t num_threads = std::thread::hardware_concurrency(); -// std::cout << "Starting ThreadPool with " << num_threads << " threads." << std::endl; -// ThreadPool pool(num_threads); - -// // 2. Define read parameters and storage for futures -// std::vector> futures; -// int num_reads = 1000; -// std::int64_t read_offset = 0; -// std::int64_t read_limit = 1024 * 1024 * 1024; // 1 GiB - -// std::cout << "Enqueuing " << num_reads << " read tasks..." << std::endl; - -// // 3. Enqueue all the read tasks -// // The original loop is replaced with this loop. -// for (int i = 0; i < num_reads; ++i) { -// // Pass *descriptor (the ObjectDescriptor) by value. -// // This is safe because it's a copyable wrapper. -// futures.push_back( -// pool.enqueue(ReadRangeTask, descriptor_ptr, read_offset, read_limit) -// ); -// } - -// // 4. Wait for all enqueued tasks to complete -// std::cout << "Waiting for all " << futures.size() << " read tasks to complete..." << std::endl; -// for (auto& f : futures) { -// f.get(); // This blocks until the future is ready. -// // If a task failed (e.g., via ASSERT_STATUS_OK), -// // gtest will have already flagged the failure. -// // If the task threw an exception, get() will re-throw it. -// } - -// std::cout << "All " << num_reads << " parallel read tasks completed." << std::endl; - - // --- End of ThreadPool implementation --- - - // auto actual0 = std::string{}; - // for(int i =0 ; i< 1000 ; i++){ - // std::tie(r0, t0) = descriptor->Read(0 , 1024* 1024* 1024); - // actual0 = std::string{}; - // while (t0.valid()) { - // auto read = r0.Read(std::move(t0)).get(); - // ASSERT_STATUS_OK(read); - // ReadPayload p; - // AsyncToken t; - // std::tie(p, t) = *std::move(read); - // t0 = std::move(t); - // } - // } - -// auto ans = block + block + block; - EXPECT_EQ(1,2); -} -*/ - - -TEST_F(AsyncClientIntegrationTest, ComposeObjectSimple) { - auto project_id = GetEnv("GOOGLE_CLOUD_PROJECT").value_or(""); - ASSERT_FALSE(project_id.empty()); - auto bucket_name = GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); - ASSERT_FALSE(bucket_name.empty()); - - // Use the standard sync client to hit the real GCS endpoint for ComposeObject. - auto client = MakeGrpcClient(project_id); - auto source_name_1 = MakeRandomObjectName(); - auto source_name_2 = MakeRandomObjectName(); - auto composed_name = MakeRandomObjectName(); - - auto const block1 = MakeRandomData(1024); - auto const block2 = MakeRandomData(1024); - - auto insert1 = client.InsertObject(bucket_name, source_name_1, block1); - ASSERT_STATUS_OK(insert1); - ScheduleForDelete(*insert1); - - auto insert2 = client.InsertObject(bucket_name, source_name_2, block2); - ASSERT_STATUS_OK(insert2); - ScheduleForDelete(*insert2); - - std::vector sources = {{source_name_1, {}, {}}, - {source_name_2, {}, {}}}; - - auto composed = client.ComposeObject(bucket_name, sources, composed_name); - ASSERT_STATUS_OK(composed); - ScheduleForDelete(*composed); - EXPECT_EQ(composed->size(), block1.size() + block2.size()); + writer.Close(); + + // Reset the existing writer and resume the upload. + writer = AsyncWriter(); + + auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); + ASSERT_STATUS_OK(object_metadata); + auto m = *object_metadata; + auto generation = m.generation(); + + w = async + .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, + generation) + .get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + auto const persisted = writer.PersistedState(); + ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); + // Cast to `int` because otherwise we need to write multiple casts below. + auto offset = static_cast(absl::get(persisted)); + if (offset % kBlockSize != 0) { + auto s = block.substr(offset % kBlockSize); + auto const size = s.size(); + auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); + ASSERT_STATUS_OK(p); + offset += static_cast(size); + token = *std::move(p); + } + while (offset < kDesiredSize) { + auto const n = std::min(kBlockSize, kDesiredSize - offset); + auto p = + writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); + ASSERT_STATUS_OK(p); + offset += n; + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kDesiredSize); +} + +TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) { + // Skipping the test till we get the takeover feature on testbench. + GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); + + auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); + ASSERT_STATUS_OK(object_metadata); + auto m = *object_metadata; + auto generation = m.generation(); + + w = async + .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, + generation) + .get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + EXPECT_FALSE(token.valid()); + EXPECT_THAT(writer.PersistedState(), VariantWith( + IsProtoEqual(*metadata))); +} + +TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + // Explicitly flush the data. + auto flush_status = writer.Flush().get(); + EXPECT_STATUS_OK(flush_status); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, Open) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 8 * 1024; + auto constexpr kStride = 2 * kSize; + auto constexpr kBlockCount = 4; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); +} + +TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = + AsyncClient(TestOptions().set(1024)); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 2048; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); } } // namespace @@ -417,4 +1065,4 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC \ No newline at end of file +#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC From b5077ac28002e8b5c3b543fc75257d19c44af02a Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Wed, 13 May 2026 09:22:49 +0000 Subject: [PATCH 3/3] resolve ai comments --- google/cloud/storage/internal/object_requests.cc | 2 +- google/cloud/storage/well_known_parameters.h | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/object_requests.cc b/google/cloud/storage/internal/object_requests.cc index 07d5a0c95c40a..5ca59b9060e2d 100644 --- a/google/cloud/storage/internal/object_requests.cc +++ b/google/cloud/storage/internal/object_requests.cc @@ -379,7 +379,7 @@ std::string ComposeObjectRequest::JsonPayload() const { source_object_list.emplace_back(std::move(source_object_json)); } compose_object_payload_json["sourceObjects"] = source_object_list; - if (HasOption() && GetOption().value()) { + if (GetOption().value_or(false)) { compose_object_payload_json["deleteSourceObjects"] = true; } diff --git a/google/cloud/storage/well_known_parameters.h b/google/cloud/storage/well_known_parameters.h index 5493f4f4874eb..9eb8774ca726c 100644 --- a/google/cloud/storage/well_known_parameters.h +++ b/google/cloud/storage/well_known_parameters.h @@ -605,6 +605,9 @@ struct ReturnPartialSuccess } }; +/** + * Controls if the source objects should be deleted after a successful compose. + */ struct DeleteSourceObjects : public internal::WellKnownParameter { using WellKnownParameter::WellKnownParameter;