diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMHALeaderSpecificACLEnforcement.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMHALeaderSpecificACLEnforcement.java index 43acb0f823d..86d2fe66137 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMHALeaderSpecificACLEnforcement.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMHALeaderSpecificACLEnforcement.java @@ -22,22 +22,31 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PARTIAL_DELETE; +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PARTIAL_RENAME; import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERMISSION_DENIED; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.hadoop.hdds.client.OzoneQuota; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl; +import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.client.BucketArgs; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneBucket; @@ -48,8 +57,11 @@ import org.apache.hadoop.ozone.client.VolumeArgs; import org.apache.hadoop.ozone.client.io.OzoneOutputStream; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer; +import org.apache.hadoop.ozone.security.acl.OzoneObj; +import org.apache.hadoop.ozone.security.acl.OzoneObjInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ozone.test.GenericTestUtils; import org.junit.jupiter.api.AfterAll; @@ -111,14 +123,14 @@ public void restoreLeadership() throws IOException, InterruptedException, Timeou OzoneManager currentLeader = cluster.getOMLeader(); if (!currentLeader.getOMNodeId().equals(theLeaderOM.getOMNodeId())) { currentLeader.transferLeadership(theLeaderOM.getOMNodeId()); - GenericTestUtils.waitFor(() -> { + BooleanSupplier leadershipCheck = () -> { try { - OzoneManager currentLeaderCheck = cluster.getOMLeader(); - return !currentLeaderCheck.getOMNodeId().equals(currentLeader.getOMNodeId()); + return !cluster.getOMLeader().getOMNodeId().equals(currentLeader.getOMNodeId()); } catch (Exception e) { return false; } - }, 1000, 30000); + }; + GenericTestUtils.waitFor(leadershipCheck, 1000, 30000); } } @@ -140,20 +152,18 @@ public void testOMHAAdminPrivilegesAfterLeadershipChange() throws Exception { addAdminToSpecificOM(currentLeader, TEST_USER); // Verify admin was added - assertTrue(currentLeader.getOmAdminUsernames().contains(TEST_USER), - "Test user should be admin on leader OM"); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); // Step 3: Test volume and bucket creation as test user (should succeed) testVolumeAndBucketCreationAsUser(true); // Step 4: Force leadership transfer to another OM node - OzoneManager newLeader = cluster.transferOMLeadershipToAnotherNode(currentLeader); + OzoneManager newLeader = transferLeadershipToAnotherNode(currentLeader); assertNotEquals(leaderNodeId, newLeader.getOMNodeId(), "Leadership should have transferred to a different node"); // Step 5: Verify test user is NOT admin on new leader - assertTrue(!newLeader.getOmAdminUsernames().contains(TEST_USER), - "Test user should NOT be admin on new leader OM"); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); // Step 6: Test volume and bucket creation as test user (should fail) testVolumeAndBucketCreationAsUser(false); @@ -336,8 +346,7 @@ public void testKeySetTimesAclEnforcementAfterLeadershipChange() throws Exceptio addAdminToSpecificOM(currentLeader, TEST_USER); // Verify admin was added - assertTrue(currentLeader.getOmAdminUsernames().contains(TEST_USER), - "Test user should be admin on leader OM"); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); // Switch to test user and try setTimes as admin (should succeed) UserGroupInformation.setLoginUser(testUserUgi); @@ -359,8 +368,7 @@ public void testKeySetTimesAclEnforcementAfterLeadershipChange() throws Exceptio OzoneManager newLeader = cluster.transferOMLeadershipToAnotherNode(currentLeader); assertNotEquals(leaderNodeId, newLeader.getOMNodeId(), "Leadership should have transferred to a different node"); - assertFalse(newLeader.getOmAdminUsernames().contains(TEST_USER), - "Test user should NOT be admin on new leader OM"); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); long anotherMtime = System.currentTimeMillis() + 10000; OMException exception = assertThrows(OMException.class, () -> { @@ -374,6 +382,623 @@ public void testKeySetTimesAclEnforcementAfterLeadershipChange() throws Exceptio } } + /** + * Tests that setQuota ACL check is enforced in preExecute and is leader-specific. + */ + @Test + public void testVolumeSetQuotaAclEnforcementAfterLeadershipChange() throws Exception { + ObjectStore adminObjectStore = client.getObjectStore(); + String testVolume = "quotavol-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + + // Create volume as admin + VolumeArgs volumeArgs = VolumeArgs.newBuilder() + .setOwner(adminUserUgi.getShortUserName()) + .build(); + adminObjectStore.createVolume(testVolume, volumeArgs); + + // Add test user as admin on current leader + OzoneManager currentLeader = cluster.getOMLeader(); + addAdminToSpecificOM(currentLeader, TEST_USER); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); + + // Test user should be able to set quota as admin + UserGroupInformation.setLoginUser(testUserUgi); + try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) { + ObjectStore userObjectStore = userClient.getObjectStore(); + OzoneVolume userVolume = userObjectStore.getVolume(testVolume); + + OzoneQuota quota1 = OzoneQuota.getOzoneQuota(100L * 1024 * 1024 * 1024, 1000); + userVolume.setQuota(quota1); // Set quota to 100GB + + // Transfer leadership + OzoneManager newLeader = transferLeadershipToAnotherNode(currentLeader); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); + + // Should fail on new leader + OzoneQuota quota2 = OzoneQuota.getOzoneQuota(200L * 1024 * 1024 * 1024, 2000); + OMException exception = assertThrows(OMException.class, () -> { + userVolume.setQuota(quota2); + }, "setQuota should fail for non-admin user on new leader"); + assertEquals(PERMISSION_DENIED, exception.getResult()); + } finally { + UserGroupInformation.setLoginUser(adminUserUgi); + } + } + + /** + * Tests that setOwner ACL check is enforced in preExecute and is leader-specific. + */ + @Test + public void testVolumeSetOwnerAclEnforcementAfterLeadershipChange() throws Exception { + ObjectStore adminObjectStore = client.getObjectStore(); + String testVolume = "ownervol-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + + // Create volume as admin + VolumeArgs volumeArgs = VolumeArgs.newBuilder() + .setOwner(adminUserUgi.getShortUserName()) + .build(); + adminObjectStore.createVolume(testVolume, volumeArgs); + + // Add test user as admin on current leader + OzoneManager currentLeader = cluster.getOMLeader(); + addAdminToSpecificOM(currentLeader, TEST_USER); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); + + // Test user should be able to change owner as admin + UserGroupInformation.setLoginUser(testUserUgi); + try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) { + ObjectStore userObjectStore = userClient.getObjectStore(); + OzoneVolume userVolume = userObjectStore.getVolume(testVolume); + + userVolume.setOwner("newowner"); + + // Transfer leadership + OzoneManager newLeader = transferLeadershipToAnotherNode(currentLeader); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); + + // Should fail on new leader + OMException exception = assertThrows(OMException.class, () -> { + userVolume.setOwner("anothernewowner"); + }, "setOwner should fail for non-admin user on new leader"); + assertEquals(PERMISSION_DENIED, exception.getResult()); + } finally { + UserGroupInformation.setLoginUser(adminUserUgi); + } + } + + /** + * Tests that deleteVolume ACL check is enforced in preExecute and is leader-specific. + */ + @Test + public void testVolumeDeleteAclEnforcementAfterLeadershipChange() throws Exception { + ObjectStore adminObjectStore = client.getObjectStore(); + String testVolume1 = "delvol1-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + String testVolume2 = "delvol2-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + + // Create volumes as admin + VolumeArgs volumeArgs = VolumeArgs.newBuilder() + .setOwner(adminUserUgi.getShortUserName()) + .build(); + adminObjectStore.createVolume(testVolume1, volumeArgs); + adminObjectStore.createVolume(testVolume2, volumeArgs); + + // Add test user as admin on current leader + OzoneManager currentLeader = cluster.getOMLeader(); + addAdminToSpecificOM(currentLeader, TEST_USER); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); + + // Test user should be able to delete volume as admin + UserGroupInformation.setLoginUser(testUserUgi); + try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) { + ObjectStore userObjectStore = userClient.getObjectStore(); + + userObjectStore.deleteVolume(testVolume1); + + // Transfer leadership + OzoneManager newLeader = transferLeadershipToAnotherNode(currentLeader); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); + + // Should fail on new leader + OMException exception = assertThrows(OMException.class, () -> { + userObjectStore.deleteVolume(testVolume2); + }, "deleteVolume should fail for non-admin user on new leader"); + assertEquals(PERMISSION_DENIED, exception.getResult()); + } finally { + UserGroupInformation.setLoginUser(adminUserUgi); + } + } + + /** + * Tests that setBucketProperty ACL check is enforced in preExecute and is leader-specific. + */ + @Test + public void testBucketSetPropertyAclEnforcementAfterLeadershipChange() throws Exception { + ObjectStore adminObjectStore = client.getObjectStore(); + String testVolume = "bucketpropvol-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + String testBucket = "bucketprop-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + + // Create volume and bucket as admin + VolumeArgs volumeArgs = VolumeArgs.newBuilder() + .setOwner(adminUserUgi.getShortUserName()) + .build(); + adminObjectStore.createVolume(testVolume, volumeArgs); + OzoneVolume adminVolume = adminObjectStore.getVolume(testVolume); + + BucketArgs bucketArgs = BucketArgs.newBuilder().build(); + adminVolume.createBucket(testBucket, bucketArgs); + + // Add test user as admin on current leader + OzoneManager currentLeader = cluster.getOMLeader(); + addAdminToSpecificOM(currentLeader, TEST_USER); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); + + // Test user should be able to set bucket properties as admin + UserGroupInformation.setLoginUser(testUserUgi); + try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) { + ObjectStore userObjectStore = userClient.getObjectStore(); + OzoneVolume userVolume = userObjectStore.getVolume(testVolume); + OzoneBucket userBucket = userVolume.getBucket(testBucket); + + // Set versioning + userBucket.setVersioning(true); + + // Transfer leadership + OzoneManager newLeader = transferLeadershipToAnotherNode(currentLeader); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); + + // Should fail on new leader + OMException exception = assertThrows(OMException.class, () -> { + userBucket.setVersioning(false); + }, "setBucketProperty should fail for non-admin user on new leader"); + assertEquals(PERMISSION_DENIED, exception.getResult()); + } finally { + UserGroupInformation.setLoginUser(adminUserUgi); + } + } + + /** + * Tests that setBucketOwner ACL check is enforced in preExecute and is leader-specific. + */ + @Test + public void testBucketSetOwnerAclEnforcementAfterLeadershipChange() throws Exception { + ObjectStore adminObjectStore = client.getObjectStore(); + String testVolume = "bucketownervol-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + String testBucket = "bucketowner-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + + // Create volume and bucket as admin + VolumeArgs volumeArgs = VolumeArgs.newBuilder() + .setOwner(adminUserUgi.getShortUserName()) + .build(); + adminObjectStore.createVolume(testVolume, volumeArgs); + OzoneVolume adminVolume = adminObjectStore.getVolume(testVolume); + + BucketArgs bucketArgs = BucketArgs.newBuilder().build(); + adminVolume.createBucket(testBucket, bucketArgs); + + // Add test user as admin on current leader + OzoneManager currentLeader = cluster.getOMLeader(); + addAdminToSpecificOM(currentLeader, TEST_USER); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); + + // Test user should be able to set bucket owner as admin + UserGroupInformation.setLoginUser(testUserUgi); + try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) { + ObjectStore userObjectStore = userClient.getObjectStore(); + OzoneVolume userVolume = userObjectStore.getVolume(testVolume); + OzoneBucket userBucket = userVolume.getBucket(testBucket); + + // Set new owner + userBucket.setOwner("newowner"); + + // Transfer leadership + OzoneManager newLeader = transferLeadershipToAnotherNode(currentLeader); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); + + // Should fail on new leader + OMException exception = assertThrows(OMException.class, () -> { + userBucket.setOwner("anothernewowner"); + }, "setBucketOwner should fail for non-admin user on new leader"); + assertEquals(PERMISSION_DENIED, exception.getResult()); + } finally { + UserGroupInformation.setLoginUser(adminUserUgi); + } + } + + /** + * Tests that deleteBucket ACL check is enforced in preExecute and is leader-specific. + */ + @Test + public void testBucketDeleteAclEnforcementAfterLeadershipChange() throws Exception { + ObjectStore adminObjectStore = client.getObjectStore(); + String testVolume = "bucketdelvol-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + String testBucket1 = "bucketdel1-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + String testBucket2 = "bucketdel2-" + + RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT); + + // Create volume and buckets as admin + VolumeArgs volumeArgs = VolumeArgs.newBuilder() + .setOwner(adminUserUgi.getShortUserName()) + .build(); + adminObjectStore.createVolume(testVolume, volumeArgs); + OzoneVolume adminVolume = adminObjectStore.getVolume(testVolume); + + BucketArgs bucketArgs = BucketArgs.newBuilder().build(); + adminVolume.createBucket(testBucket1, bucketArgs); + adminVolume.createBucket(testBucket2, bucketArgs); + + // Add test user as admin on current leader + OzoneManager currentLeader = cluster.getOMLeader(); + addAdminToSpecificOM(currentLeader, TEST_USER); + assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER); + + // Test user should be able to delete bucket as admin + UserGroupInformation.setLoginUser(testUserUgi); + try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) { + ObjectStore userObjectStore = userClient.getObjectStore(); + OzoneVolume userVolume = userObjectStore.getVolume(testVolume); + + userVolume.deleteBucket(testBucket1); + + // Transfer leadership + OzoneManager newLeader = transferLeadershipToAnotherNode(currentLeader); + assertThat(newLeader.getOmAdminUsernames()).doesNotContain(TEST_USER); + + // Should fail on new leader + OMException exception = assertThrows(OMException.class, () -> { + userVolume.deleteBucket(testBucket2); + }, "deleteBucket should fail for non-admin user on new leader"); + assertEquals(PERMISSION_DENIED, exception.getResult()); + } finally { + UserGroupInformation.setLoginUser(adminUserUgi); + } + } + + /** + * Tests that deleteKeys (bulk) ACL check is enforced in preExecute and is leader-specific. + * + *
This test verifies that when using the bulk deleteKeys API: + *
The test flow: + *
This test verifies that when using the bulk renameKeys API: + *
The test flow: + *
Note: This test uses LEGACY bucket layout because the bulk renameKeys API is deprecated
+ * and not supported for FILE_SYSTEM_OPTIMIZED layouts.
+ */
+ @Test
+ public void testKeysRenameAclEnforcementAfterLeadershipChange() throws Exception {
+ ObjectStore adminObjectStore = client.getObjectStore();
+ String testVolume = "keysrenamevol-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+ String testBucket = "keysrename-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+ String keyName1 = "key1-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+ String keyName2 = "key2-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+ String newKeyName1 = "newkey1-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+ String newKeyName2 = "newkey2-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+
+ // Step 1: Create volume, bucket with LEGACY layout, and keys as admin
+ VolumeArgs volumeArgs = VolumeArgs.newBuilder()
+ .setOwner(adminUserUgi.getShortUserName())
+ .build();
+ adminObjectStore.createVolume(testVolume, volumeArgs);
+ OzoneVolume adminVolume = adminObjectStore.getVolume(testVolume);
+
+ // Use LEGACY bucket layout since bulk renameKeys is not supported for FSO
+ BucketArgs bucketArgs = BucketArgs.newBuilder()
+ .setBucketLayout(BucketLayout.LEGACY)
+ .build();
+ adminVolume.createBucket(testBucket, bucketArgs);
+ OzoneBucket adminBucket = adminVolume.getBucket(testBucket);
+
+ // Create test keys
+ try (OzoneOutputStream out = adminBucket.createKey(keyName1, 0)) {
+ out.write("test data 1".getBytes(UTF_8));
+ }
+ try (OzoneOutputStream out = adminBucket.createKey(keyName2, 0)) {
+ out.write("test data 2".getBytes(UTF_8));
+ }
+
+ // Step 2: Add test user as admin on current leader
+ OzoneManager currentLeader = cluster.getOMLeader();
+ String originalLeaderNodeId = currentLeader.getOMNodeId();
+ addAdminToSpecificOM(currentLeader, TEST_USER);
+ assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER);
+
+ // Step 3: Test user renames first key successfully using bulk API
+ UserGroupInformation.setLoginUser(testUserUgi);
+ try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) {
+ ObjectStore userObjectStore = userClient.getObjectStore();
+ OzoneVolume userVolume = userObjectStore.getVolume(testVolume);
+ OzoneBucket userBucket = userVolume.getBucket(testBucket);
+
+ // Use bulk renameKeys API (this supports ACL filtering)
+ Map This test verifies that ACL operations on keys work correctly across leadership changes:
+ *
+ *
+ */
+ @Test
+ public void testKeyAclOperationsEnforcementAfterLeadershipChangeWithFSO() throws Exception {
+ ObjectStore adminObjectStore = client.getObjectStore();
+ String testVolume = "keyaclvol-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+ String testBucket = "keyaclbucket-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+ String keyName = "keyacl-" +
+ RandomStringUtils.secure().nextAlphabetic(5).toLowerCase(Locale.ROOT);
+
+ // Step 1: Create volume, bucket with FSO layout, and key as admin
+ VolumeArgs volumeArgs = VolumeArgs.newBuilder()
+ .setOwner(adminUserUgi.getShortUserName())
+ .build();
+ adminObjectStore.createVolume(testVolume, volumeArgs);
+ OzoneVolume adminVolume = adminObjectStore.getVolume(testVolume);
+
+ // Use FILE_SYSTEM_OPTIMIZED bucket layout
+ BucketArgs bucketArgs = BucketArgs.newBuilder()
+ .setBucketLayout(BucketLayout.FILE_SYSTEM_OPTIMIZED)
+ .build();
+ adminVolume.createBucket(testBucket, bucketArgs);
+ OzoneBucket adminBucket = adminVolume.getBucket(testBucket);
+
+ // Create a key as admin (so test user is NOT the owner)
+ try (OzoneOutputStream out = adminBucket.createKey(keyName, 0)) {
+ out.write("test data for ACL operations".getBytes(UTF_8));
+ }
+
+ OzoneKey key = adminBucket.getKey(keyName);
+ assertNotNull(key, "Key should be created successfully");
+
+ // Create OzoneObj for ACL operations
+ OzoneObj keyObj = OzoneObjInfo.Builder.newBuilder()
+ .setVolumeName(testVolume)
+ .setBucketName(testBucket)
+ .setKeyName(keyName)
+ .setResType(OzoneObj.ResourceType.KEY)
+ .setStoreType(OzoneObj.StoreType.OZONE)
+ .build();
+
+ int originalAclCount = adminObjectStore.getAcl(keyObj).size();
+
+ // Step 2: Add test user as admin on current leader
+ OzoneManager currentLeader = cluster.getOMLeader();
+ String leaderNodeId = currentLeader.getOMNodeId();
+ addAdminToSpecificOM(currentLeader, TEST_USER);
+ assertThat(currentLeader.getOmAdminUsernames()).contains(TEST_USER);
+
+ // Step 3: Test user performs ACL operations as admin (should succeed)
+ UserGroupInformation.setLoginUser(testUserUgi);
+ try (OzoneClient userClient = OzoneClientFactory.getRpcClient(OM_SERVICE_ID, cluster.getConf())) {
+ ObjectStore userObjectStore = userClient.getObjectStore();
+
+ // Add ACL - should succeed
+ OzoneAcl addAcl = OzoneAcl.parseAcl("user:anotheruser:rw[ACCESS]");
+ boolean addResult = userObjectStore.addAcl(keyObj, addAcl);
+ assertThat(addResult).isTrue();
+
+ // Verify ACL was added
+ List