diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2924416ad480..24c3c838b8fa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -40,6 +40,7 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; @@ -293,6 +295,27 @@ Map buildCommonLocalResources() { return commonLocalResources; } + @VisibleForTesting + Credentials createLocalResourceCredentialsExcludingDefaultFS( + Map resourceMap) throws IOException, URISyntaxException { + String defaultFS = conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY); + Credentials credentials = new Credentials(); + List ps = new ArrayList<>(); + + for (LocalResource resource : resourceMap.values()) { + Path p = resource.getResource().toPath(); + String path = p.toString(); + if (path.startsWith("/") || path.startsWith(defaultFS)) { + LOG.info("skip collecting path to issue token because it is defaultFS: path={}", p); + } else { + LOG.info("collect path to issue token: path={}", p); + ps.add(p); + } + } + TokenCache.obtainTokensForNamenodes(credentials, ps.toArray(new Path[0]), conf); + return credentials; + } + /** * Opens a Tez session without performing a complete rollback/cleanup on failure. * diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java index ffd1081d56f6..159ed4de4b7d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java @@ -16,23 +16,36 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.net.URISyntaxException; import java.io.IOException; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConfForTest; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.tez.dag.api.TezException; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + public class TestTezSessionState { private static final Logger LOG = LoggerFactory.getLogger(TestTezSessionState.class.getName()); @@ -46,8 +59,8 @@ private static SessionState createSessionState() { @Test public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception { - Path jarPath = Files.createTempFile("jar", ""); - Path symlinkPath = Paths.get(jarPath.toString() + ".symlink"); + java.nio.file.Path jarPath = Files.createTempFile("jar", ""); + java.nio.file.Path symlinkPath = Paths.get(jarPath.toString() + ".symlink"); Files.createSymbolicLink(symlinkPath, jarPath); // write some data into the fake jar, it's not a 0 length file in real life @@ -104,7 +117,7 @@ void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) */ @Test public void testCommonLocalResourcesPopulatedOnSessionOpen() throws Exception { - Path jarPath = Files.createTempFile("test-jar", ".jar"); + java.nio.file.Path = Files.createTempFile("test-jar", ".jar"); Files.write(jarPath, "testCommonLocalResourcesPopulated".getBytes(), StandardOpenOption.APPEND); SessionState ss = createSessionState(); @@ -133,4 +146,99 @@ void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console) { sessionStateForTest.open(resources); } -} \ No newline at end of file + + // --- createLocalResourceCredentialsExcludingDefaultFS ------------------------------------------ + + private static final String DEFAULT_FS = "hdfs://defaultnn"; + + private static LocalResource resourceAt(String pathStr) throws URISyntaxException { + LocalResource lr = Mockito.mock(LocalResource.class); + URL url = Mockito.mock(URL.class); + Mockito.when(lr.getResource()).thenReturn(url); + Mockito.when(url.toPath()).thenReturn(new Path(pathStr)); + return lr; + } + + private static Map resourceMapOf(String... pathStrs) throws URISyntaxException { + // preserve insertion order so the assertions don't depend on HashMap ordering + Map m = new LinkedHashMap<>(); + for (int i = 0; i < pathStrs.length; i++) { + m.put("r" + i, resourceAt(pathStrs[i])); + } + return m; + } + + private static TezSessionState newSession() { + HiveConf conf = new HiveConf(); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, DEFAULT_FS); + return new TezSessionState(DagUtils.getInstance(), conf); + } + + @Test + public void testCredentialsEmptyResourceMapDoesNotRequestTokens() throws Exception { + TezSessionState sessionState = newSession(); + try (MockedStatic tokenCache = Mockito.mockStatic(TokenCache.class)) { + Credentials credentials = sessionState.createLocalResourceCredentialsExcludingDefaultFS(new HashMap<>()); + + Assert.assertNotNull(credentials); + ArgumentCaptor paths = ArgumentCaptor.forClass(Path[].class); + tokenCache.verify(() -> TokenCache.obtainTokensForNamenodes( + Mockito.eq(credentials), paths.capture(), Mockito.any())); + Assert.assertEquals(0, paths.getValue().length); + } + } + + @Test + public void testCredentialsDefaultFsResourceIsSkipped() throws Exception { + TezSessionState sessionState = newSession(); + Map resources = resourceMapOf(DEFAULT_FS + "/user/x/hive-exec.jar", "/user/x/hadoop-auth.jar"); + + try (MockedStatic tokenCache = Mockito.mockStatic(TokenCache.class)) { + Credentials credentials = sessionState.createLocalResourceCredentialsExcludingDefaultFS(resources); + + ArgumentCaptor paths = ArgumentCaptor.forClass(Path[].class); + tokenCache.verify(() -> TokenCache.obtainTokensForNamenodes( + Mockito.eq(credentials), paths.capture(), Mockito.any())); + Assert.assertEquals(0, paths.getValue().length); + } + } + + @Test + public void testCredentialsNonDefaultFsResourceCollectsPath() throws Exception { + TezSessionState sessionState = newSession(); + String addedJar = "hdfs://othernn/user/u/add-jar.jar"; + Map resources = resourceMapOf(addedJar); + + try (MockedStatic tokenCache = Mockito.mockStatic(TokenCache.class)) { + Credentials credentials = sessionState.createLocalResourceCredentialsExcludingDefaultFS(resources); + + ArgumentCaptor paths = ArgumentCaptor.forClass(Path[].class); + tokenCache.verify(() -> TokenCache.obtainTokensForNamenodes( + Mockito.eq(credentials), paths.capture(), Mockito.any())); + Assert.assertArrayEquals(new Path[] { new Path(addedJar) }, paths.getValue()); + } + } + + @Test + public void testCredentialsMixedResourcesOnlyNonDefaultFsCollected() throws Exception { + TezSessionState sessionState = newSession(); + String addedJarA = "hdfs://nn-a/user/u/jar-a.jar"; + String addedJarB = "hdfs://nn-b/user/u/jar-b.jar"; + Map resources = resourceMapOf( + DEFAULT_FS + "/tmp/hive/session/hive-exec.jar", // defaultFS — skipped + "/user/x/hadoop-auth.jar", // defaultFS — skipped + addedJarA, // collected + addedJarB); // collected + + try (MockedStatic tokenCache = Mockito.mockStatic(TokenCache.class)) { + Credentials credentials = sessionState.createLocalResourceCredentialsExcludingDefaultFS(resources); + + ArgumentCaptor paths = ArgumentCaptor.forClass(Path[].class); + tokenCache.verify(() -> TokenCache.obtainTokensForNamenodes( + Mockito.eq(credentials), paths.capture(), Mockito.any())); + Assert.assertEquals( + Arrays.asList(new Path(addedJarA), new Path(addedJarB)), + Arrays.asList(paths.getValue())); + } + } +}