Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

/**
* A minimal RecordReader that returns exactly one dummy record per InputSplit.
* A minimal RecordReader that returns one dummy record per {@link org.apache.phoenix.query.KeyRange
* KeyRange} carried by a {@link PhoenixInputSplit}.
* <p>
* Use this when your mapper:
* <ul>
Expand All @@ -35,42 +36,52 @@
* </ul>
* <p>
* This avoids the overhead of scanning and returning all rows when the mapper only needs to be
* triggered once per region/split. The standard {@link PhoenixRecordReader} iterates through all
* rows, calling {@code map()} for each row - which is wasteful when the mapper ignores the row data
* entirely.
* triggered per region. The standard {@link PhoenixRecordReader} iterates through all rows, calling
* {@code map()} for each row - which is wasteful when the mapper ignores the row data entirely.
* <p>
* <b>How it works:</b>
* <ul>
* <li>{@link #nextKeyValue()} returns {@code true} exactly once, then {@code false}</li>
* <li>This triggers {@code map()} exactly once per InputSplit (region)</li>
* <li>The mapper extracts region boundaries from the InputSplit, not from records</li>
* <li>{@link #initialize(InputSplit, TaskAttemptContext)} reads the {@link PhoenixInputSplit}'s key
* ranges to learn how many records to emit (one per range)</li>
* <li>{@link #nextKeyValue()} returns {@code true} once per range, then {@code false}</li>
* <li>This triggers {@code map()} once per range; for a coalesced split with N regions the mapper
* runs {@code map()} N times, giving the framework per-range visibility</li>
* <li>{@link #getProgress()} returns the fraction of ranges already consumed, so YARN sees real
* mapper progress instead of a 0% to 100% jump at the end</li>
* </ul>
* @see PhoenixSyncTableInputFormat
* @see PhoenixRecordReader
*/
public class PhoenixNoOpSingleRecordReader extends RecordReader<NullWritable, DBWritable> {
public class PhoenixNoOpPerRangeRecordReader extends RecordReader<NullWritable, DBWritable> {

private boolean hasRecord = true;
private int totalRanges = 1;
private int consumedRanges = 0;

/**
* Initialize the RecordReader. No initialization is needed since we return a single dummy record.
* Initialize the RecordReader. Reads the number of key ranges from the {@link PhoenixInputSplit}
* so subsequent {@link #nextKeyValue()} calls emit one record per range.
* @param split The InputSplit containing region boundaries
* @param context The task context
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context) {
// No initialization needed
if (split instanceof PhoenixInputSplit) {
int rangeCount = ((PhoenixInputSplit) split).getKeyRanges().size();
if (rangeCount > 0) {
this.totalRanges = rangeCount;
}
}
}

/**
* Returns true exactly once to trigger a single map() call per split.
* @return true on first call, false on subsequent calls which makes Mapper task to exit calling
* map method
* Returns true once per key range in the split, then false.
* @return true while ranges remain unprocessed; false once all ranges have been emitted, which
* makes the Mapper task exit calling map method
*/
@Override
public boolean nextKeyValue() {
if (hasRecord) {
hasRecord = false;
if (consumedRanges < totalRanges) {
consumedRanges++;
return true;
}
return false;
Expand All @@ -96,12 +107,16 @@ public DBWritable getCurrentValue() {
}

/**
* Returns progress: 0.0 before the record is consumed, 1.0 after.
* @return 0.0f if record not yet consumed, 1.0f otherwise
* Returns the fraction of ranges already consumed, so YARN reports real mapper progress as each
* range completes (important for coalesced splits where a single mapper covers many regions).
* @return progress in [0.0, 1.0]
*/
@Override
public float getProgress() {
return hasRecord ? 0.0f : 1.0f;
if (totalRanges == 0) {
return 1.0f;
}
return ((float) consumedRanges) / totalRanges;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public enum Type {

public enum Status {
VERIFIED,
MISMATCHED
MISMATCHED,
REPAIRED,
UNREPAIRABLE,
REPAIR_FAILED
}

private String tableName;
Expand Down Expand Up @@ -143,69 +146,69 @@ public String getCounters() {

@VisibleForTesting
public long getSourceRowsProcessed() {
return CounterFormatter.parseSourceRows(counters);
return CounterFormatter.parseCounterValue(counters,
PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name());
}

@VisibleForTesting
public long getTargetRowsProcessed() {
return CounterFormatter.parseTargetRows(counters);
return CounterFormatter.parseCounterValue(counters,
PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name());
}

/**
* Utility class for formatting and parsing counter strings. Encapsulates the counter format
* contract to ensure consistency between formatting (in mapper) and parsing (in tests).
*/
public static class CounterFormatter {
private static final String FORMAT_CHUNK = "%s=%d,%s=%d";
private static final String FORMAT_MAPPER = "%s=%d,%s=%d,%s=%d,%s=%d";
private static final String FORMAT_CHUNK =
"%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d";
private static final String FORMAT_MAPPER =
"%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d,%s=%d";

/**
* Formats chunk counters as comma-separated key=value pairs.
* @param sourceRows Source rows processed
* @param targetRows Target rows processed
* @return Formatted string: "SOURCE_ROWS_PROCESSED=123,TARGET_ROWS_PROCESSED=456"
* Formats chunk counters as comma-separated key=value pairs. Always emits all nine counters;
* unpopulated counters are 0 so operators querying the checkpoint table see a uniform format.
* {@code ROWS_DIFFERENT_ON_TARGET} is populated only in dry-run; cell-level counters and
* {@code ROWS_CANNOT_REPAIR} are populated only in repair mode.
*/
public static String formatChunk(long sourceRows, long targetRows) {
public static String formatChunk(long sourceRows, long targetRows, long rowsMissingOnTarget,
long rowsExtraOnTarget, long rowsDifferentOnTarget, long rowsCannotRepair,
long cellsMissingOnTarget, long cellsExtraOnTarget, long cellsDifferentOnTarget) {
return String.format(FORMAT_CHUNK,
PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name(), sourceRows,
PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows);
PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows,
PhoenixSyncTableMapper.SyncCounters.ROWS_MISSING_ON_TARGET.name(), rowsMissingOnTarget,
PhoenixSyncTableMapper.SyncCounters.ROWS_EXTRA_ON_TARGET.name(), rowsExtraOnTarget,
PhoenixSyncTableMapper.SyncCounters.ROWS_DIFFERENT_ON_TARGET.name(), rowsDifferentOnTarget,
PhoenixSyncTableMapper.SyncCounters.ROWS_CANNOT_REPAIR.name(), rowsCannotRepair,
PhoenixSyncTableMapper.SyncCounters.CELLS_MISSING_ON_TARGET.name(), cellsMissingOnTarget,
PhoenixSyncTableMapper.SyncCounters.CELLS_EXTRA_ON_TARGET.name(), cellsExtraOnTarget,
PhoenixSyncTableMapper.SyncCounters.CELLS_DIFFERENT_ON_TARGET.name(),
cellsDifferentOnTarget);
}

/**
* Formats mapper counters as comma-separated key=value pairs.
* @param chunksVerified Chunks verified count
* @param chunksMismatched Chunks mismatched count
* @param sourceRows Source rows processed
* @param targetRows Target rows processed
* @return Formatted string with all mapper counters
* Formats mapper (region-level) counters as comma-separated key=value pairs. The seven drift
* counters are the per-region sum of the same fields emitted by {@link #formatChunk}.
*/
public static String formatMapper(long chunksVerified, long chunksMismatched, long sourceRows,
long targetRows) {
long targetRows, long rowsMissingOnTarget, long rowsExtraOnTarget, long rowsDifferentOnTarget,
long rowsCannotRepair, long cellsMissingOnTarget, long cellsExtraOnTarget,
long cellsDifferentOnTarget) {
return String.format(FORMAT_MAPPER,
PhoenixSyncTableMapper.SyncCounters.CHUNKS_VERIFIED.name(), chunksVerified,
PhoenixSyncTableMapper.SyncCounters.CHUNKS_MISMATCHED.name(), chunksMismatched,
PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name(), sourceRows,
PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows);
}

/**
* Parses SOURCE_ROWS_PROCESSED value from counter string.
* @param counters Counter string in format "KEY1=val1,KEY2=val2,..."
* @return Source rows processed, or 0 if not found
*/
public static long parseSourceRows(String counters) {
return parseCounterValue(counters,
PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name());
}

/**
* Parses TARGET_ROWS_PROCESSED value from counter string.
* @param counters Counter string in format "KEY1=val1,KEY2=val2,..."
* @return Target rows processed, or 0 if not found
*/
public static long parseTargetRows(String counters) {
return parseCounterValue(counters,
PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name());
PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows,
PhoenixSyncTableMapper.SyncCounters.ROWS_MISSING_ON_TARGET.name(), rowsMissingOnTarget,
PhoenixSyncTableMapper.SyncCounters.ROWS_EXTRA_ON_TARGET.name(), rowsExtraOnTarget,
PhoenixSyncTableMapper.SyncCounters.ROWS_DIFFERENT_ON_TARGET.name(), rowsDifferentOnTarget,
PhoenixSyncTableMapper.SyncCounters.ROWS_CANNOT_REPAIR.name(), rowsCannotRepair,
PhoenixSyncTableMapper.SyncCounters.CELLS_MISSING_ON_TARGET.name(), cellsMissingOnTarget,
PhoenixSyncTableMapper.SyncCounters.CELLS_EXTRA_ON_TARGET.name(), cellsExtraOnTarget,
PhoenixSyncTableMapper.SyncCounters.CELLS_DIFFERENT_ON_TARGET.name(),
cellsDifferentOnTarget);
}

/**
Expand All @@ -214,7 +217,7 @@ public static long parseTargetRows(String counters) {
* @param counterName Name of the counter to extract
* @return Counter value, or 0 if not found
*/
private static long parseCounterValue(String counters, String counterName) {
public static long parseCounterValue(String counters, String counterName) {
if (counters == null || counters.isEmpty()) {
return 0;
}
Expand Down
Loading