diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java index badf581efefc..e06269f4474e 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALInputFormat.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -161,6 +162,17 @@ private WALStreamReader openReader(Path path, long startPosition) throws IOExcep reader = WALFactory.createStreamReader(path.getFileSystem(conf), path, conf, startPosition); return reader; + } catch (WALHeaderEOFException wheofe) { + // We hit EOF while reading the WAL header. A file that ever had an entry synced to it + // necessarily has a complete, readable header (a sync flushes the header too), so a + // header EOF means the file holds nothing recoverable right now. For a file that is not + // being actively written (a closed/archived WAL, or one left empty by a crashed + // RegionServer) the header never appears, so retrying only delays an inevitable skip. + // The one case a retry could help is a WAL still being written by the legacy + // (non-async) writer that has not yet flushed its header; but we skip that too. + LOG.warn("Got WALHeaderEOFException opening reader for {}, skipping empty WAL file.", + path, wheofe); + return null; } catch (LeaseNotRecoveredException lnre) { // HBASE-15019 the WAL was not closed due to some hiccup. LOG.warn("Try to recover the WAL lease " + path, lnre); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java index 92138e7dfe72..6cd331ccd966 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import java.util.ArrayList; import java.util.List; @@ -36,6 +37,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; @@ -120,4 +122,44 @@ public void testHandlesArchivedWALFiles() throws Exception { assertEquals(archiveWal.toString(), split.getLogFileName()); } + /** + * Test that an empty WAL file (which causes WALHeaderEOFException) is gracefully handled and + * skipped rather than causing the job to fail. + */ + @Test + public void testHandlesEmptyWALFile() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + + // Create an empty WAL file + Path walRootDir = CommonFSUtils.getWALRootDir(conf); + Path emptyWalFile = + new Path(walRootDir, "WALs/empty-wal-test/empty." + EnvironmentEdgeManager.currentTime()); + TEST_UTIL.getTestFileSystem().mkdirs(emptyWalFile.getParent()); + TEST_UTIL.getTestFileSystem().create(emptyWalFile).close(); + + JobContext ctx = Mockito.mock(JobContext.class); + conf.set(FileInputFormat.INPUT_DIR, emptyWalFile.toString()); + conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";"); + Mockito.when(ctx.getConfiguration()).thenReturn(conf); + Job job = Job.getInstance(conf); + TableMapReduceUtil.initCredentialsForCluster(job, conf); + Mockito.when(ctx.getCredentials()).thenReturn(job.getCredentials()); + + // Create record reader and verify it handles the empty file gracefully + try (WALInputFormat.WALKeyRecordReader reader = new WALInputFormat.WALKeyRecordReader()) { + TaskAttemptContext taskCtx = Mockito.mock(TaskAttemptContext.class); + Mockito.when(taskCtx.getConfiguration()).thenReturn(conf); + + WALInputFormat wif = new WALInputFormat(); + List splits = wif.getSplits(ctx); + assertEquals(1, splits.size()); + WALInputFormat.WALSplit split = (WALInputFormat.WALSplit) splits.get(0); + + // This should not throw WALHeaderEOFException - it should return false for nextKeyValue() + reader.initialize(split, taskCtx); + // nextKeyValue() should return false since the file is empty (reader is null) + assertFalse(reader.nextKeyValue()); + } + } + }