diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala index 09fa6f3eb30..14737b98001 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala @@ -35,6 +35,9 @@ import scala.jdk.CollectionConverters._ */ object LakeFSStorageClient { + // Maximum number of results per LakeFS API request (pagination page size) + private val PageSize = 1000 + private lazy val apiClient: ApiClient = { val client = new ApiClient() client.setApiKey(StorageConfig.lakefsPassword) @@ -300,8 +303,36 @@ object LakeFSStorageClient { .sortBy(_.getCreationDate)(Ordering[java.lang.Long].reverse) // Sort in descending order } + /** + * Fetches all pages from a paginated LakeFS API call. + * + * @param fetch A function that takes a pagination cursor and returns (results, pagination). + * @return All results across all pages. + */ + private def fetchAllPages[T]( + fetch: String => (java.util.List[T], Pagination) + ): List[T] = { + val allResults = scala.collection.mutable.ListBuffer[T]() + var hasMore = true + var after = "" // Pagination cursor returned by LakeFS + + while (hasMore) { + val (results, pagination) = fetch(after) + allResults ++= results.asScala + hasMore = pagination.getHasMore + if (hasMore) after = pagination.getNextOffset + } + + allResults.toList + } + def retrieveObjectsOfVersion(repoName: String, commitHash: String): List[ObjectStats] = { - objectsApi.listObjects(repoName, commitHash).execute().getResults.asScala.toList + fetchAllPages[ObjectStats] { after => + val request = objectsApi.listObjects(repoName, commitHash).amount(PageSize) + if (after.nonEmpty) request.after(after) + val response = request.execute() + (response.getResults, response.getPagination) + } } def retrieveRepositorySize(repoName: String, commitHash: String = ""): Long = { @@ -334,12 +365,12 @@ object LakeFSStorageClient { * @return List of uncommitted object stats. */ def retrieveUncommittedObjects(repoName: String): List[Diff] = { - branchesApi - .diffBranch(repoName, branchName) - .execute() - .getResults - .asScala - .toList + fetchAllPages[Diff] { after => + val request = branchesApi.diffBranch(repoName, branchName).amount(PageSize) + if (after.nonEmpty) request.after(after) + val response = request.execute() + (response.getResults, response.getPagination) + } } def createCommit(repoName: String, branch: String, commitMessage: String): Commit = { diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index a71861491a6..7c94dc8fe90 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -2445,4 +2445,30 @@ class DatasetResourceSpec fetchSession(filePath) shouldBe null fetchPartRows(uploadId) shouldBe empty } + + // =========================================================================== + // Pagination test – verify that listing APIs return more than the default (100 items) + // =========================================================================== + + "LakeFS pagination" should "return all files when count exceeds one page for both uncommitted and committed objects" taggedAs Slow in { + val repoName = + s"pagination-${System.nanoTime()}-${Random.alphanumeric.take(6).mkString.toLowerCase}" + LakeFSStorageClient.initRepo(repoName) + + val totalFiles = 110 + (1 to totalFiles).foreach { i => + LakeFSStorageClient.writeFileToRepo( + repoName, + s"file-$i.txt", + new ByteArrayInputStream(s"content-$i".getBytes(StandardCharsets.UTF_8)) + ) + } + + // before commit: 110 files should appear as uncommitted diffs + LakeFSStorageClient.retrieveUncommittedObjects(repoName).size shouldEqual totalFiles + + // after commit: 110 files should appear as committed objects + val commit = LakeFSStorageClient.withCreateVersion(repoName, "commit all files") {} + LakeFSStorageClient.retrieveObjectsOfVersion(repoName, commit.getId).size shouldEqual totalFiles + } }