Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ object LargeBinaryManager extends LazyLogging {
}

/**
* Deletes all large binaries for one execution. Uses deleteDirectory, which removes one
* listing page (<= 1000 objects) — enough for expected counts; more needs a paginated delete.
* Deletes all large binaries for one execution via deleteDirectory, which removes every object
* under the prefix.
*
* @param executionId the execution whose large binaries should be removed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import software.amazon.awssdk.services.s3.{S3Client, S3Configuration}
import software.amazon.awssdk.core.sync.RequestBody

import java.io.InputStream
import java.security.MessageDigest
import scala.jdk.CollectionConverters._

/**
Expand All @@ -40,6 +39,10 @@ object S3StorageClient {
val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000
//Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180
val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24
// S3 DeleteObjects accepts at most 1000 keys per request.
val MAX_KEYS_PER_DELETE_REQUEST = 1000
// Cap how many failed keys are listed in the error message.
private[util] val MAX_LISTED_DELETE_ERRORS = 10

// Initialize MinIO-compatible S3 Client
private lazy val s3Client: S3Client = {
Expand Down Expand Up @@ -106,41 +109,44 @@ object S3StorageClient {
// Ensure the directory prefix ends with `/` to avoid accidental deletions
val prefix = if (directoryPrefix.endsWith("/")) directoryPrefix else directoryPrefix + "/"

// List objects under the given prefix
val listRequest = ListObjectsV2Request
.builder()
.bucket(bucketName)
.prefix(prefix)
.build()

val listResponse = s3Client.listObjectsV2(listRequest)

// Extract object keys
val objectKeys = listResponse.contents().asScala.map(_.key())

if (objectKeys.nonEmpty) {
val objectsToDelete =
objectKeys.map(key => ObjectIdentifier.builder().key(key).build()).asJava

val deleteRequest = Delete
.builder()
.objects(objectsToDelete)
.build()

// Compute MD5 checksum for MinIO if required
val md5Hash = MessageDigest
.getInstance("MD5")
.digest(deleteRequest.toString.getBytes("UTF-8"))

// Convert object keys to S3 DeleteObjectsRequest format
val deleteObjectsRequest = DeleteObjectsRequest
.builder()
.bucket(bucketName)
.delete(deleteRequest)
.build()
val listRequest = ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).build()

// Paginate across all pages, then delete in batches within the per-request key limit.
s3Client
.listObjectsV2Paginator(listRequest)
.contents()
.asScala
.iterator
.map(obj => ObjectIdentifier.builder().key(obj.key()).build())
.grouped(MAX_KEYS_PER_DELETE_REQUEST)
.foreach { batch =>
val response = s3Client.deleteObjects(
DeleteObjectsRequest
.builder()
.bucket(bucketName)
.delete(Delete.builder().objects(batch.asJava).build())
.build()
)
throwOnDeleteErrors(prefix, response)
}
}

// Perform batch deletion
s3Client.deleteObjects(deleteObjectsRequest)
/**
* DeleteObjects reports per-key failures in its response instead of throwing;
* raise if any key failed.
*/
private[util] def throwOnDeleteErrors(prefix: String, response: DeleteObjectsResponse): Unit = {
val failed = response.errors().asScala
if (failed.nonEmpty) {
val listed = failed.take(MAX_LISTED_DELETE_ERRORS).map(e => s"${e.key()} (${e.code()})")
val summary =
if (failed.size > MAX_LISTED_DELETE_ERRORS)
s" (and ${failed.size - MAX_LISTED_DELETE_ERRORS} more)"
else ""
throw new RuntimeException(
s"Failed to delete ${failed.size} object(s) under prefix '$prefix': " +
listed.mkString(", ") + summary
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ package org.apache.texera.service.util

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import software.amazon.awssdk.services.s3.model.{DeleteObjectsResponse, S3Error}

import java.io.ByteArrayInputStream
import java.util.concurrent.Executors
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random

class S3StorageClientSpec
Expand Down Expand Up @@ -334,4 +338,91 @@ class S3StorageClientSpec

S3StorageClient.deleteObject(testBucketName, objectKey)
}

// ========================================
// deleteDirectory Tests
// ========================================

test("deleteDirectory should delete all objects under a prefix") {
val prefix = "delete-dir/small"
val keys = (0 until 5).map(i => s"$prefix/object-$i.txt")
keys.foreach(key =>
S3StorageClient.uploadObject(testBucketName, key, createInputStream("data"))
)

assert(S3StorageClient.directoryExists(testBucketName, prefix))

S3StorageClient.deleteDirectory(testBucketName, prefix)

assert(!S3StorageClient.directoryExists(testBucketName, prefix))
}

test("deleteDirectory should delete more than 1000 objects under a prefix") {
// >1000 objects exercises pagination and delete batching; without them the tail is orphaned.
val prefix = "delete-dir/large"
val objectCount = 1001

// Upload concurrently to keep the test reasonably fast.
val pool = Executors.newFixedThreadPool(16)
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(pool)
try {
val uploads = (0 until objectCount).map { i =>
Future {
S3StorageClient.uploadObject(
testBucketName,
f"$prefix/object-$i%05d.txt",
createInputStream("")
)
}
}
Await.result(Future.sequence(uploads), 5.minutes)
} finally {
pool.shutdown()
}

assert(S3StorageClient.directoryExists(testBucketName, prefix))

S3StorageClient.deleteDirectory(testBucketName, prefix)

assert(!S3StorageClient.directoryExists(testBucketName, prefix))
}

test("deleteDirectory should not throw for a prefix with no objects") {
// Empty listing: no DeleteObjects request is issued.
S3StorageClient.deleteDirectory(testBucketName, "delete-dir/non-existent")
}

test("deleteDirectory should surface per-key delete failures rather than swallow them") {
// A real per-key failure needs object-lock setup, so verify response handling directly.
val responseWithError = DeleteObjectsResponse
.builder()
.errors(S3Error.builder().key("delete-dir/locked.txt").code("AccessDenied").build())
.build()

val thrown = intercept[RuntimeException] {
S3StorageClient.throwOnDeleteErrors("delete-dir/", responseWithError)
}
assert(thrown.getMessage.contains("delete-dir/locked.txt"))
assert(thrown.getMessage.contains("AccessDenied"))

// A response with no errors must not throw.
S3StorageClient.throwOnDeleteErrors("delete-dir/", DeleteObjectsResponse.builder().build())

// Many failures: message reports the true total but only lists up to the cap.
val cap = S3StorageClient.MAX_LISTED_DELETE_ERRORS
val errorCount = cap + 5
val manyErrors = (0 until errorCount).map(i =>
S3Error.builder().key(f"delete-dir/locked-$i%02d.txt").code("AccessDenied").build()
)
val thrownMany = intercept[RuntimeException] {
S3StorageClient.throwOnDeleteErrors(
"delete-dir/",
DeleteObjectsResponse.builder().errors(manyErrors: _*).build()
)
}
assert(thrownMany.getMessage.contains(s"$errorCount object(s)"))
assert(thrownMany.getMessage.contains(f"delete-dir/locked-00.txt")) // first key is listed
assert(!thrownMany.getMessage.contains(f"delete-dir/locked-$cap%02d.txt")) // capped key is not
assert(thrownMany.getMessage.contains(s"and ${errorCount - cap} more"))
}
}
Loading