diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala index 2fa4acb5302..df61981252d 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/LargeBinaryManager.scala @@ -75,8 +75,8 @@ object LargeBinaryManager extends LazyLogging { } /** - * Deletes all large binaries for one execution. Uses deleteDirectory, which removes one - * listing page (<= 1000 objects) — enough for expected counts; more needs a paginated delete. + * Deletes all large binaries for one execution via deleteDirectory, which removes every object + * under the prefix. * * @param executionId the execution whose large binaries should be removed */ diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index 148205a6812..15ddfb027a9 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -27,7 +27,6 @@ import software.amazon.awssdk.services.s3.{S3Client, S3Configuration} import software.amazon.awssdk.core.sync.RequestBody import java.io.InputStream -import java.security.MessageDigest import scala.jdk.CollectionConverters._ /** @@ -40,6 +39,10 @@ object S3StorageClient { val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000 //Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180 val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24 + // S3 DeleteObjects accepts at most 1000 keys per request. + val MAX_KEYS_PER_DELETE_REQUEST = 1000 + // Cap how many failed keys are listed in the error message. + private[util] val MAX_LISTED_DELETE_ERRORS = 10 // Initialize MinIO-compatible S3 Client private lazy val s3Client: S3Client = { @@ -106,41 +109,44 @@ object S3StorageClient { // Ensure the directory prefix ends with `/` to avoid accidental deletions val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else directoryPrefix + "/" - // List objects under the given prefix - val listRequest = ListObjectsV2Request - .builder() - .bucket(bucketName) - .prefix(prefix) - .build() - - val listResponse = s3Client.listObjectsV2(listRequest) - - // Extract object keys - val objectKeys = listResponse.contents().asScala.map(_.key()) - - if (objectKeys.nonEmpty) { - val objectsToDelete = - objectKeys.map(key => ObjectIdentifier.builder().key(key).build()).asJava - - val deleteRequest = Delete - .builder() - .objects(objectsToDelete) - .build() - - // Compute MD5 checksum for MinIO if required - val md5Hash = MessageDigest - .getInstance("MD5") - .digest(deleteRequest.toString.getBytes("UTF-8")) - - // Convert object keys to S3 DeleteObjectsRequest format - val deleteObjectsRequest = DeleteObjectsRequest - .builder() - .bucket(bucketName) - .delete(deleteRequest) - .build() + val listRequest = ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build() + + // Paginate across all pages, then delete in batches within the per-request key limit. + s3Client + .listObjectsV2Paginator(listRequest) + .contents() + .asScala + .iterator + .map(obj => ObjectIdentifier.builder().key(obj.key()).build()) + .grouped(MAX_KEYS_PER_DELETE_REQUEST) + .foreach { batch => + val response = s3Client.deleteObjects( + DeleteObjectsRequest + .builder() + .bucket(bucketName) + .delete(Delete.builder().objects(batch.asJava).build()) + .build() + ) + throwOnDeleteErrors(prefix, response) + } + } - // Perform batch deletion - s3Client.deleteObjects(deleteObjectsRequest) + /** + * DeleteObjects reports per-key failures in its response instead of throwing; + * raise if any key failed. + */ + private[util] def throwOnDeleteErrors(prefix: String, response: DeleteObjectsResponse): Unit = { + val failed = response.errors().asScala + if (failed.nonEmpty) { + val listed = failed.take(MAX_LISTED_DELETE_ERRORS).map(e => s"${e.key()} (${e.code()})") + val summary = + if (failed.size > MAX_LISTED_DELETE_ERRORS) + s" (and ${failed.size - MAX_LISTED_DELETE_ERRORS} more)" + else "" + throw new RuntimeException( + s"Failed to delete ${failed.size} object(s) under prefix '$prefix': " + + listed.mkString(", ") + summary + ) } } diff --git a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala index a1662cf8c3f..cb2db865876 100644 --- a/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala +++ b/common/workflow-core/src/test/scala/org/apache/texera/service/util/S3StorageClientSpec.scala @@ -21,8 +21,12 @@ package org.apache.texera.service.util import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite +import software.amazon.awssdk.services.s3.model.{DeleteObjectsResponse, S3Error} import java.io.ByteArrayInputStream +import java.util.concurrent.Executors +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} import scala.util.Random class S3StorageClientSpec @@ -334,4 +338,91 @@ class S3StorageClientSpec S3StorageClient.deleteObject(testBucketName, objectKey) } + + // ======================================== + // deleteDirectory Tests + // ======================================== + + test("deleteDirectory should delete all objects under a prefix") { + val prefix = "delete-dir/small" + val keys = (0 until 5).map(i => s"$prefix/object-$i.txt") + keys.foreach(key => + S3StorageClient.uploadObject(testBucketName, key, createInputStream("data")) + ) + + assert(S3StorageClient.directoryExists(testBucketName, prefix)) + + S3StorageClient.deleteDirectory(testBucketName, prefix) + + assert(!S3StorageClient.directoryExists(testBucketName, prefix)) + } + + test("deleteDirectory should delete more than 1000 objects under a prefix") { + // >1000 objects exercises pagination and delete batching; without them the tail is orphaned. + val prefix = "delete-dir/large" + val objectCount = 1001 + + // Upload concurrently to keep the test reasonably fast. + val pool = Executors.newFixedThreadPool(16) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(pool) + try { + val uploads = (0 until objectCount).map { i => + Future { + S3StorageClient.uploadObject( + testBucketName, + f"$prefix/object-$i%05d.txt", + createInputStream("") + ) + } + } + Await.result(Future.sequence(uploads), 5.minutes) + } finally { + pool.shutdown() + } + + assert(S3StorageClient.directoryExists(testBucketName, prefix)) + + S3StorageClient.deleteDirectory(testBucketName, prefix) + + assert(!S3StorageClient.directoryExists(testBucketName, prefix)) + } + + test("deleteDirectory should not throw for a prefix with no objects") { + // Empty listing: no DeleteObjects request is issued. + S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent") + } + + test("deleteDirectory should surface per-key delete failures rather than swallow them") { + // A real per-key failure needs object-lock setup, so verify response handling directly. + val responseWithError = DeleteObjectsResponse + .builder() + .errors(S3Error.builder().key("delete-dir/locked.txt").code("AccessDenied").build()) + .build() + + val thrown = intercept[RuntimeException] { + S3StorageClient.throwOnDeleteErrors("delete-dir/", responseWithError) + } + assert(thrown.getMessage.contains("delete-dir/locked.txt")) + assert(thrown.getMessage.contains("AccessDenied")) + + // A response with no errors must not throw. + S3StorageClient.throwOnDeleteErrors("delete-dir/", DeleteObjectsResponse.builder().build()) + + // Many failures: message reports the true total but only lists up to the cap. + val cap = S3StorageClient.MAX_LISTED_DELETE_ERRORS + val errorCount = cap + 5 + val manyErrors = (0 until errorCount).map(i => + S3Error.builder().key(f"delete-dir/locked-$i%02d.txt").code("AccessDenied").build() + ) + val thrownMany = intercept[RuntimeException] { + S3StorageClient.throwOnDeleteErrors( + "delete-dir/", + DeleteObjectsResponse.builder().errors(manyErrors: _*).build() + ) + } + assert(thrownMany.getMessage.contains(s"$errorCount object(s)")) + assert(thrownMany.getMessage.contains(f"delete-dir/locked-00.txt")) // first key is listed + assert(!thrownMany.getMessage.contains(f"delete-dir/locked-$cap%02d.txt")) // capped key is not + assert(thrownMany.getMessage.contains(s"and ${errorCount - cap} more")) + } }