From e3deabaa3538e60d25bd8fd824396d9f71dfd5eb Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 8 Jun 2026 14:56:53 -0700 Subject: [PATCH 1/3] fix(workflow-core): paginate S3 deleteDirectory deletions deleteDirectory issued a single listObjectsV2 call (capped at 1000 keys per page) and one deleteObjects batch, so any objects beyond the first 1000 under a prefix were silently orphaned. AWS DeleteObjects also accepts at most 1000 keys per request, and reports per-key failures in its response rather than throwing. List via listObjectsV2Paginator, which transparently follows the continuation token across all pages, deleting keys in batches of at most 1000. Inspect each DeleteObjects response and raise if any key failed, so partial deletions are no longer swallowed. Also remove the unused MD5 computation (digested over the request's toString and never used). Closes #5281 --- .../texera/service/util/S3StorageClient.scala | 73 +++++++++-------- .../service/util/S3StorageClientSpec.scala | 80 +++++++++++++++++++ 2 files changed, 118 insertions(+), 35 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index 148205a6812..bc6fc7bbe3d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -27,7 +27,6 @@ import software.amazon.awssdk.services.s3.{S3Client, S3Configuration} import software.amazon.awssdk.core.sync.RequestBody import java.io.InputStream -import java.security.MessageDigest import scala.jdk.CollectionConverters._ /** @@ -40,6 +39,9 @@ object S3StorageClient { val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000 //Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180 val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24 + // AWS S3 DeleteObjects accepts at most 1000 keys per request (listObjectsV2 also + // returns at most 1000 keys per page). + val MAX_KEYS_PER_DELETE_REQUEST = 1000 // Initialize MinIO-compatible S3 Client private lazy val s3Client: S3Client = { @@ -106,41 +108,42 @@ object S3StorageClient { // Ensure the directory prefix ends with `/` to avoid accidental deletions val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else directoryPrefix + "/" - // List objects under the given prefix - val listRequest = ListObjectsV2Request - .builder() - .bucket(bucketName) - .prefix(prefix) - .build() - - val listResponse = s3Client.listObjectsV2(listRequest) - - // Extract object keys - val objectKeys = listResponse.contents().asScala.map(_.key()) - - if (objectKeys.nonEmpty) { - val objectsToDelete = - objectKeys.map(key => ObjectIdentifier.builder().key(key).build()).asJava - - val deleteRequest = Delete - .builder() - .objects(objectsToDelete) - .build() - - // Compute MD5 checksum for MinIO if required - val md5Hash = MessageDigest - .getInstance("MD5") - .digest(deleteRequest.toString.getBytes("UTF-8")) - - // Convert object keys to S3 DeleteObjectsRequest format - val deleteObjectsRequest = DeleteObjectsRequest - .builder() - .bucket(bucketName) - .delete(deleteRequest) - .build() + val listRequest = ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build() + + // listObjectsV2Paginator transparently follows the continuation token across all + // pages (listObjectsV2 returns at most 1000 keys per page). Keys are grouped into + // batches of at most MAX_KEYS_PER_DELETE_REQUEST, the DeleteObjects per-request limit. + s3Client + .listObjectsV2Paginator(listRequest) + .contents() + .asScala + .iterator + .map(obj => ObjectIdentifier.builder().key(obj.key()).build()) + .grouped(MAX_KEYS_PER_DELETE_REQUEST) + .foreach { batch => + val response = s3Client.deleteObjects( + DeleteObjectsRequest + .builder() + .bucket(bucketName) + .delete(Delete.builder().objects(batch.asJava).build()) + .build() + ) + throwOnDeleteErrors(prefix, response) + } + } - // Perform batch deletion - s3Client.deleteObjects(deleteObjectsRequest) + /** + * The DeleteObjects API reports per-key failures in its response instead of throwing, + * so an unchecked response can silently leave objects behind. Raise if any key in the + * batch failed to delete. + */ + private[util] def throwOnDeleteErrors(prefix: String, response: DeleteObjectsResponse): Unit = { + val failed = response.errors().asScala + if (failed.nonEmpty) { + throw new RuntimeException( + s"Failed to delete ${failed.size} object(s) under prefix '$prefix': " + + failed.map(error => s"${error.key()} (${error.code()})").mkString(", ") + ) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala index a1662cf8c3f..805089be08c 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala @@ -21,8 +21,12 @@ package org.apache.texera.service.util import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite +import software.amazon.awssdk.services.s3.model.{DeleteObjectsResponse, S3Error} import java.io.ByteArrayInputStream +import java.util.concurrent.Executors +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Random class S3StorageClientSpec @@ -334,4 +338,80 @@ class S3StorageClientSpec S3StorageClient.deleteObject(testBucketName, objectKey) } + + // ======================================== + // deleteDirectory Tests + // ======================================== + + test("deleteDirectory should delete all objects under a prefix") { + val prefix = "delete-dir/small" + val keys = (0 until 5).map(i => s"$prefix/object-$i.txt") + keys.foreach(key => + S3StorageClient.uploadObject(testBucketName, key, createInputStream("data")) + ) + + assert(S3StorageClient.directoryExists(testBucketName, prefix)) + + S3StorageClient.deleteDirectory(testBucketName, prefix) + + assert(!S3StorageClient.directoryExists(testBucketName, prefix)) + } + + test("deleteDirectory should delete more than 1000 objects under a prefix") { + // listObjectsV2 returns at most 1000 keys per page and DeleteObjects accepts at + // most 1000 keys per request. Exceeding 1000 objects exercises both the + // continuation-token pagination loop and the batching of deletions; without them + // only the first 1000 objects are removed and the rest are silently orphaned. + val prefix = "delete-dir/large" + val objectCount = 1001 + + // Upload concurrently to keep the test reasonably fast. + val pool = Executors.newFixedThreadPool(16) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(pool) + try { + val uploads = (0 until objectCount).map { i => + Future { + S3StorageClient.uploadObject( + testBucketName, + f"$prefix/object-$i%05d.txt", + createInputStream("") + ) + } + } + Await.result(Future.sequence(uploads), 5.minutes) + } finally { + pool.shutdown() + } + + assert(S3StorageClient.directoryExists(testBucketName, prefix)) + + S3StorageClient.deleteDirectory(testBucketName, prefix) + + assert(!S3StorageClient.directoryExists(testBucketName, prefix)) + } + + test("deleteDirectory should not throw for a prefix with no objects") { + // Nothing to delete: the listing is empty, so no DeleteObjects request is issued. + S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent") + } + + test("deleteDirectory should surface per-key delete failures rather than swallow them") { + // DeleteObjects returns failed keys in its response instead of throwing, so the + // client must inspect them; otherwise partial deletions become silent storage leaks. + // (Provoking a real per-key failure needs object-lock/retention setup, so the + // response-handling is verified directly here.) + val responseWithError = DeleteObjectsResponse + .builder() + .errors(S3Error.builder().key("delete-dir/locked.txt").code("AccessDenied").build()) + .build() + + val thrown = intercept[RuntimeException] { + S3StorageClient.throwOnDeleteErrors("delete-dir/", responseWithError) + } + assert(thrown.getMessage.contains("delete-dir/locked.txt")) + assert(thrown.getMessage.contains("AccessDenied")) + + // A response with no errors must not throw. + S3StorageClient.throwOnDeleteErrors("delete-dir/", DeleteObjectsResponse.builder().build()) + } } From 7044ff62a48bf1805339d84b225a6eda7886fbea Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 8 Jun 2026 15:22:00 -0700 Subject: [PATCH 2/3] fix(workflow-core): bound delete-error message; refresh deleteDirectory docs - Cap throwOnDeleteErrors enumeration at MAX_LISTED_DELETE_ERRORS (10) and summarize the remainder, so a fully-failed batch can't produce an unbounded message. - Update the LargeBinaryManager doc that still described the pre-pagination single-page (<=1000) behavior of deleteDirectory. --- .../service/util/LargeBinaryManager.scala | 4 ++-- .../texera/service/util/S3StorageClient.scala | 11 ++++++++++- .../service/util/S3StorageClientSpec.scala | 18 ++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 2fa4acb5302..e9ce3285fb9 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -75,8 +75,8 @@ object LargeBinaryManager extends LazyLogging { } /** - * Deletes all large binaries for one execution. Uses deleteDirectory, which removes one - * listing page (<= 1000 objects) — enough for expected counts; more needs a paginated delete. + * Deletes all large binaries for one execution. Uses deleteDirectory, which paginates over + * every object under the prefix (no per-page limit), so all binaries for the execution are removed. * * @param executionId the execution whose large binaries should be removed */ diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index bc6fc7bbe3d..f96043d33d4 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -42,6 +42,10 @@ object S3StorageClient { // AWS S3 DeleteObjects accepts at most 1000 keys per request (listObjectsV2 also // returns at most 1000 keys per page). val MAX_KEYS_PER_DELETE_REQUEST = 1000 + // A failed DeleteObjects batch can report up to MAX_KEYS_PER_DELETE_REQUEST per-key + // errors; only enumerate this many in the exception message and summarize the rest so + // the message stays bounded. + private[util] val MAX_LISTED_DELETE_ERRORS = 10 // Initialize MinIO-compatible S3 Client private lazy val s3Client: S3Client = { @@ -140,9 +144,14 @@ object S3StorageClient { private[util] def throwOnDeleteErrors(prefix: String, response: DeleteObjectsResponse): Unit = { val failed = response.errors().asScala if (failed.nonEmpty) { + val listed = failed.take(MAX_LISTED_DELETE_ERRORS).map(e => s"${e.key()} (${e.code()})") + val summary = + if (failed.size > MAX_LISTED_DELETE_ERRORS) + s" (and ${failed.size - MAX_LISTED_DELETE_ERRORS} more)" + else "" throw new RuntimeException( s"Failed to delete ${failed.size} object(s) under prefix '$prefix': " + - failed.map(error => s"${error.key()} (${error.code()})").mkString(", ") + listed.mkString(", ") + summary ) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala index 805089be08c..9ec5f4833d2 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala @@ -413,5 +413,23 @@ class S3StorageClientSpec // A response with no errors must not throw. S3StorageClient.throwOnDeleteErrors("delete-dir/", DeleteObjectsResponse.builder().build()) + + // When many keys fail, the message reports the true total but only enumerates the first + // MAX_LISTED_DELETE_ERRORS keys and summarizes the rest, so it stays bounded. + val cap = S3StorageClient.MAX_LISTED_DELETE_ERRORS + val errorCount = cap + 5 + val manyErrors = (0 until errorCount).map(i => + S3Error.builder().key(f"delete-dir/locked-$i%02d.txt").code("AccessDenied").build() + ) + val thrownMany = intercept[RuntimeException] { + S3StorageClient.throwOnDeleteErrors( + "delete-dir/", + DeleteObjectsResponse.builder().errors(manyErrors: _*).build() + ) + } + assert(thrownMany.getMessage.contains(s"$errorCount object(s)")) + assert(thrownMany.getMessage.contains(f"delete-dir/locked-00.txt")) // first key is listed + assert(!thrownMany.getMessage.contains(f"delete-dir/locked-$cap%02d.txt")) // capped key is not + assert(thrownMany.getMessage.contains(s"and ${errorCount - cap} more")) } } From 5c644e168b4ee3729243681717a14ac2afa3746d Mon Sep 17 00:00:00 2001 From: Kunwoo Park Date: Mon, 8 Jun 2026 15:26:57 -0700 Subject: [PATCH 3/3] docs(workflow-core): simplify comments around deleteDirectory pagination --- .../texera/service/util/LargeBinaryManager.scala | 4 ++-- .../texera/service/util/S3StorageClient.scala | 16 +++++----------- .../service/util/S3StorageClientSpec.scala | 15 ++++----------- 3 files changed, 11 insertions(+), 24 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index e9ce3285fb9..df61981252d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -75,8 +75,8 @@ object LargeBinaryManager extends LazyLogging { } /** - * Deletes all large binaries for one execution. Uses deleteDirectory, which paginates over - * every object under the prefix (no per-page limit), so all binaries for the execution are removed. + * Deletes all large binaries for one execution via deleteDirectory, which removes every object + * under the prefix. * * @param executionId the execution whose large binaries should be removed */ diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index f96043d33d4..15ddfb027a9 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -39,12 +39,9 @@ object S3StorageClient { val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000 //Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180 val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24 - // AWS S3 DeleteObjects accepts at most 1000 keys per request (listObjectsV2 also - // returns at most 1000 keys per page). + // S3 DeleteObjects accepts at most 1000 keys per request. val MAX_KEYS_PER_DELETE_REQUEST = 1000 - // A failed DeleteObjects batch can report up to MAX_KEYS_PER_DELETE_REQUEST per-key - // errors; only enumerate this many in the exception message and summarize the rest so - // the message stays bounded. + // Cap how many failed keys are listed in the error message. private[util] val MAX_LISTED_DELETE_ERRORS = 10 // Initialize MinIO-compatible S3 Client @@ -114,9 +111,7 @@ object S3StorageClient { val listRequest = ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build() - // listObjectsV2Paginator transparently follows the continuation token across all - // pages (listObjectsV2 returns at most 1000 keys per page). Keys are grouped into - // batches of at most MAX_KEYS_PER_DELETE_REQUEST, the DeleteObjects per-request limit. + // Paginate across all pages, then delete in batches within the per-request key limit. s3Client .listObjectsV2Paginator(listRequest) .contents() @@ -137,9 +132,8 @@ object S3StorageClient { } /** - * The DeleteObjects API reports per-key failures in its response instead of throwing, - * so an unchecked response can silently leave objects behind. Raise if any key in the - * batch failed to delete. + * DeleteObjects reports per-key failures in its response instead of throwing; + * raise if any key failed. */ private[util] def throwOnDeleteErrors(prefix: String, response: DeleteObjectsResponse): Unit = { val failed = response.errors().asScala diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala index 9ec5f4833d2..cb2db865876 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala @@ -358,10 +358,7 @@ class S3StorageClientSpec } test("deleteDirectory should delete more than 1000 objects under a prefix") { - // listObjectsV2 returns at most 1000 keys per page and DeleteObjects accepts at - // most 1000 keys per request. Exceeding 1000 objects exercises both the - // continuation-token pagination loop and the batching of deletions; without them - // only the first 1000 objects are removed and the rest are silently orphaned. + // >1000 objects exercises pagination and delete batching; without them the tail is orphaned. val prefix = "delete-dir/large" val objectCount = 1001 @@ -391,15 +388,12 @@ class S3StorageClientSpec } test("deleteDirectory should not throw for a prefix with no objects") { - // Nothing to delete: the listing is empty, so no DeleteObjects request is issued. + // Empty listing: no DeleteObjects request is issued. S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent") } test("deleteDirectory should surface per-key delete failures rather than swallow them") { - // DeleteObjects returns failed keys in its response instead of throwing, so the - // client must inspect them; otherwise partial deletions become silent storage leaks. - // (Provoking a real per-key failure needs object-lock/retention setup, so the - // response-handling is verified directly here.) + // A real per-key failure needs object-lock setup, so verify response handling directly. val responseWithError = DeleteObjectsResponse .builder() .errors(S3Error.builder().key("delete-dir/locked.txt").code("AccessDenied").build()) @@ -414,8 +408,7 @@ class S3StorageClientSpec // A response with no errors must not throw. S3StorageClient.throwOnDeleteErrors("delete-dir/", DeleteObjectsResponse.builder().build()) - // When many keys fail, the message reports the true total but only enumerates the first - // MAX_LISTED_DELETE_ERRORS keys and summarizes the rest, so it stays bounded. + // Many failures: message reports the true total but only lists up to the cap. val cap = S3StorageClient.MAX_LISTED_DELETE_ERRORS val errorCount = cap + 5 val manyErrors = (0 until errorCount).map(i =>