Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public void close(boolean evictOnClose) throws IOException {
long offset = s.getCurBlock().getOffset();
LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, split offset: {}",
this, reference, top, offset);
((HFileReaderImpl) reader).getCacheConf().getBlockCache().ifPresent(cache -> {
((HFileReaderImpl) reader).getCacheConf().getCacheAccessService().ifEnabled(cache -> {
int numEvictedReferred = top
? cache.evictBlocksRangeByHfileName(referred, offset, Long.MAX_VALUE)
: cache.evictBlocksRangeByHfileName(referred, 0, offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ public boolean shouldCacheBlockOnRead(BlockCategory category, HFileInfo hFileInf
Configuration conf) {
Optional<Boolean> cacheFileBlock = Optional.of(true);
// For DATA blocks only, if BucketCache is in use, we don't need to cache block again
if (getBlockCache().isPresent() && category.equals(BlockCategory.DATA)) {
Optional<Boolean> result = getBlockCache().get().shouldCacheFile(hFileInfo, conf);
if (category.equals(BlockCategory.DATA)) {
Optional<Boolean> result = getCacheAccessService().shouldCacheFile(hFileInfo, conf);
if (result.isPresent()) {
cacheFileBlock = result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ public long writeIndexBlocks(FSDataOutputStream out) throws IOException {
if (midKeyMetadata != null) blockStream.write(midKeyMetadata);
blockWriter.writeHeaderAndData(out);
if (cacheConf != null) {
cacheConf.getBlockCache().ifPresent(cache -> {
cacheConf.getCacheAccessService().ifEnabled(cache -> {
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
cache.cacheBlock(new BlockCacheKey(nameForCaching, rootLevelIndexPos, true,
blockForCaching.getBlockType()), blockForCaching);
Expand Down Expand Up @@ -1169,7 +1169,7 @@ private void writeIntermediateBlock(FSDataOutputStream out, BlockIndexChunk pare
blockWriter.writeHeaderAndData(out);

if (getCacheOnWrite()) {
cacheConf.getBlockCache().ifPresent(cache -> {
cacheConf.getCacheAccessService().ifEnabled(cache -> {
HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
try {
cache.cacheBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public HFilePreadReader(ReaderContext context, HFileInfo fileInfo, CacheConfig c
fileInfo.initMetaAndIndex(this);
// master hosted regions, like the master procedures store wouldn't have a block cache
// Prefetch file blocks upon open if requested
if (cacheConf.getBlockCache().isPresent() && cacheConf.shouldPrefetchOnOpen()) {
if (cacheConf.getCacheAccessService().isCacheEnabled() && cacheConf.shouldPrefetchOnOpen()) {
PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
long offset = 0;
long end = 0;
HFile.Reader prefetchStreamReader = null;
try {
cacheConf.getBlockCache().ifPresent(
cacheConf.getCacheAccessService().ifEnabled(
cache -> cache.waitForCacheInitialization(WAIT_TIME_FOR_CACHE_INITIALIZATION));
ReaderContext streamReaderContext = ReaderContextBuilder.newBuilder(context)
.withReaderType(ReaderContext.ReaderType.STREAM)
Expand Down Expand Up @@ -185,7 +185,7 @@ public void close(boolean evictOnClose) throws IOException {
// Deallocate blocks in load-on-open section
this.fileInfo.close();
// Deallocate data blocks
cacheConf.getBlockCache().ifPresent(cache -> {
cacheConf.getCacheAccessService().ifEnabled(cache -> {
if (evictOnClose) {
int numEvicted = cache.evictBlocksByHfileName(name);
if (LOG.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws

// Cache the block
if (cacheBlock) {
cacheConf.getBlockCache().ifPresent(
cacheConf.getCacheAccessService().ifEnabled(
cache -> cache.cacheBlock(cacheKey, uncompressedBlock, cacheConf.isInMemory()));
}
return uncompressedBlock;
Expand All @@ -1290,7 +1290,7 @@ public HFileBlock getMetaBlock(String metaBlockName, boolean cacheBlock) throws
* boolean, boolean)
*/
private boolean shouldUseHeap(BlockType expectedBlockType, boolean cacheBlock) {
if (!cacheConf.getBlockCache().isPresent()) {
if (!cacheConf.getCacheAccessService().isCacheEnabled()) {
return false;
}

Expand Down Expand Up @@ -1411,7 +1411,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
// Don't need the unpacked block back and we're storing the block in the cache compressed
if (cacheOnly && cacheCompressed && cacheOnRead) {
HFileBlock blockNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock);
cacheConf.getBlockCache().ifPresent(cache -> {
cacheConf.getCacheAccessService().ifEnabled(cache -> {
LOG.debug("Skipping decompression of block {} in prefetch", cacheKey);
// Cache the block if necessary
if (cacheBlock && cacheOnRead) {
Expand All @@ -1427,7 +1427,7 @@ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final bo
HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
HFileBlock unpackedNoChecksum = BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
cacheConf.getCacheAccessService().ifEnabled(cache -> {
if (cacheBlock && cacheOnRead) {
// Using the wait on cache during compaction and prefetching.
cache.cacheBlock(cacheKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable;
import org.apache.hadoop.hbase.io.hfile.cache.CacheAccessService;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.security.EncryptionUtil;
import org.apache.hadoop.hbase.security.User;
Expand Down Expand Up @@ -576,7 +577,7 @@ private void writeInlineBlocks(boolean closing) throws IOException {
* @param offset the offset of the block we want to cache. Used to determine the cache key.
*/
private void doCacheOnWrite(long offset) {
cacheConf.getBlockCache().ifPresent(cache -> {
cacheConf.getCacheAccessService().ifEnabled(cache -> {
HFileBlock cacheFormatBlock = blockWriter.getBlockForCaching(cacheConf);
try {
BlockCacheKey key = buildCacheBlockKey(offset, cacheFormatBlock.getBlockType());
Expand All @@ -598,7 +599,7 @@ private BlockCacheKey buildCacheBlockKey(long offset, BlockType blockType) {
return new BlockCacheKey(name, offset, true, blockType);
}

private boolean shouldCacheBlock(BlockCache cache, BlockCacheKey key) {
private boolean shouldCacheBlock(CacheAccessService cache, BlockCacheKey key) {
Optional<Boolean> result =
cache.shouldCacheBlock(key, timeRangeTrackerForTiering.get().getMax(), conf);
return result.orElse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -305,4 +306,19 @@ public void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int d
Objects.requireNonNull(fileName, "fileName must not be null");
blockCache.notifyFileCachingCompleted(fileName, totalBlockCount, dataBlockCount, size);
}

@Override
public Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
Objects.requireNonNull(hFileInfo, "hFileInfo must not be null");
Objects.requireNonNull(conf, "conf must not be null");
return blockCache.shouldCacheFile(hFileInfo, conf);
}

@Override
public Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp,
Configuration conf) {
Objects.requireNonNull(key, "key must not be null");
Objects.requireNonNull(conf, "conf must not be null");
return blockCache.shouldCacheBlock(key, maxTimestamp, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hbase.io.hfile.cache;

import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -119,8 +122,8 @@ public interface CacheAccessService {
*/
default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
CacheRequestContext context = CacheRequestContext.newBuilder().setCaching(caching)
.setRepeat(repeat).setUpdateCacheMetrics(updateCacheMetrics).build();
CacheRequestContext context = CacheRequestContext.newBuilder().withCaching(caching)
.withRepeat(repeat).withUpdateCacheMetrics(updateCacheMetrics).build();
return getBlock(cacheKey, context);
}

Expand All @@ -140,8 +143,9 @@ default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repe
*/
default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repeat,
boolean updateCacheMetrics, BlockType blockType) {
CacheRequestContext context = CacheRequestContext.newBuilder().setCaching(caching)
.setRepeat(repeat).setUpdateCacheMetrics(updateCacheMetrics).setBlockType(blockType).build();
CacheRequestContext context =
CacheRequestContext.newBuilder().withCaching(caching).withRepeat(repeat)
.withUpdateCacheMetrics(updateCacheMetrics).withBlockType(blockType).build();
return getBlock(cacheKey, context);
}

Expand Down Expand Up @@ -178,7 +182,7 @@ default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean repe
* @param inMemory whether the block should be treated as in-memory
*/
default void cacheBlock(BlockCacheKey cacheKey, Cacheable block, boolean inMemory) {
CacheWriteContext context = CacheWriteContext.newBuilder().setInMemory(inMemory).build();
CacheWriteContext context = CacheWriteContext.newBuilder().withInMemory(inMemory).build();
cacheBlock(cacheKey, block, context);
}

Expand All @@ -196,8 +200,8 @@ default void cacheBlock(BlockCacheKey cacheKey, Cacheable block, boolean inMemor
*/
default void cacheBlock(BlockCacheKey cacheKey, Cacheable block, boolean inMemory,
boolean waitWhenCache) {
CacheWriteContext context =
CacheWriteContext.newBuilder().setInMemory(inMemory).setWaitWhenCache(waitWhenCache).build();
CacheWriteContext context = CacheWriteContext.newBuilder().withInMemory(inMemory)
.withWaitWhenCache(waitWhenCache).build();
cacheBlock(cacheKey, block, context);
}

Expand Down Expand Up @@ -444,4 +448,72 @@ default void notifyFileCachingCompleted(Path fileName, int totalBlockCount, int
long size) {
// noop
}

/**
* Executes the supplied action when this cache service is enabled.
* <p>
* This helper is intended to preserve the old {@code getBlockCache().ifPresent(...)} style for
* call sites that should do nothing when block cache is disabled. It keeps callers independent of
* concrete implementations such as {@code NoOpCacheAccessService} while still allowing disabled
* cache wiring to behave like an absent cache.
* </p>
* <p>
* Implementations normally do not need to override this method. The default implementation checks
* {@link #isCacheEnabled()} and invokes the supplied action only when cache access is enabled.
* </p>
* @param action action to execute with this cache service when enabled
*/
default void ifEnabled(Consumer<CacheAccessService> action) {
Objects.requireNonNull(action, "action must not be null");
if (isCacheEnabled()) {
action.accept(this);
}
}

/**
* Returns whether blocks from the given HFile should be cached. TODO: this method is a temporary
* adapter for file-level admission decisions. It will be removed and replaced by a more general
* admission API in the future.
* <p>
* This is a file-level admission hook used by cache population paths. Implementations may use
* file metadata, configuration, data tiering state, or implementation-specific bookkeeping to
* decide whether the file should be admitted into cache.
* </p>
* <p>
* The returned {@link Optional} is empty when the cache service does not support file-level
* admission decisions. In that case, callers should preserve existing default behavior, typically
* treating the result as "no opinion" rather than as a rejection.
* </p>
* @param hFileInfo HFile metadata used for the admission decision
* @param conf configuration
* @return empty if unsupported; otherwise whether the file should be cached
*/
default Optional<Boolean> shouldCacheFile(HFileInfo hFileInfo, Configuration conf) {
return Optional.empty();
}

/**
* Returns whether the block represented by the given key and timestamp should be cached. TODO:
* this method is a temporary adapter for file-level admission decisions. It will be removed and
* replaced by a more general admission API in the future.
* <p>
* <p>
* This is a block-level admission hook used by cache population paths. Implementations may use
* block identity, maximum timestamp, configuration, data tiering state, or
* implementation-specific policy to decide whether the block should be admitted into cache.
* </p>
* <p>
* The returned {@link Optional} is empty when the cache service does not support block-level
* admission decisions. In that case, callers should preserve existing default behavior, typically
* treating the result as "no opinion" rather than as a rejection.
* </p>
* @param key block cache key
* @param maxTimestamp maximum timestamp associated with the block
* @param conf configuration
* @return empty if unsupported; otherwise whether the block should be cached
*/
default Optional<Boolean> shouldCacheBlock(BlockCacheKey key, long maxTimestamp,
Configuration conf) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private Builder() {
* @param caching true when caching is enabled
* @return this builder
*/
public Builder setCaching(boolean caching) {
public Builder withCaching(boolean caching) {
this.caching = caching;
return this;
}
Expand All @@ -132,7 +132,7 @@ public Builder setCaching(boolean caching) {
* @param repeat true when this is a repeated lookup
* @return this builder
*/
public Builder setRepeat(boolean repeat) {
public Builder withRepeat(boolean repeat) {
this.repeat = repeat;
return this;
}
Expand All @@ -142,7 +142,7 @@ public Builder setRepeat(boolean repeat) {
* @param updateCacheMetrics true when metrics should be updated
* @return this builder
*/
public Builder setUpdateCacheMetrics(boolean updateCacheMetrics) {
public Builder withUpdateCacheMetrics(boolean updateCacheMetrics) {
this.updateCacheMetrics = updateCacheMetrics;
return this;
}
Expand All @@ -152,7 +152,7 @@ public Builder setUpdateCacheMetrics(boolean updateCacheMetrics) {
* @param blockType expected block type
* @return this builder
*/
public Builder setBlockType(BlockType blockType) {
public Builder withBlockType(BlockType blockType) {
this.blockType = blockType;
return this;
}
Expand All @@ -162,7 +162,7 @@ public Builder setBlockType(BlockType blockType) {
* @param compaction true when compaction-related
* @return this builder
*/
public Builder setCompaction(boolean compaction) {
public Builder withCompaction(boolean compaction) {
this.compaction = compaction;
return this;
}
Expand All @@ -172,7 +172,7 @@ public Builder setCompaction(boolean compaction) {
* @param prefetch true when prefetch-related
* @return this builder
*/
public Builder setPrefetch(boolean prefetch) {
public Builder withPrefetch(boolean prefetch) {
this.prefetch = prefetch;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private Builder() {
* @param inMemory true when in-memory treatment is requested
* @return this builder
*/
public Builder setInMemory(boolean inMemory) {
public Builder withInMemory(boolean inMemory) {
this.inMemory = inMemory;
return this;
}
Expand All @@ -120,7 +120,7 @@ public Builder setInMemory(boolean inMemory) {
* @param waitWhenCache true when insertion should wait
* @return this builder
*/
public Builder setWaitWhenCache(boolean waitWhenCache) {
public Builder withWaitWhenCache(boolean waitWhenCache) {
this.waitWhenCache = waitWhenCache;
return this;
}
Expand All @@ -130,7 +130,7 @@ public Builder setWaitWhenCache(boolean waitWhenCache) {
* @param cacheCompressed true when compressed caching is requested
* @return this builder
*/
public Builder setCacheCompressed(boolean cacheCompressed) {
public Builder withCacheCompressed(boolean cacheCompressed) {
this.cacheCompressed = cacheCompressed;
return this;
}
Expand All @@ -140,7 +140,7 @@ public Builder setCacheCompressed(boolean cacheCompressed) {
* @param blockCategory block category
* @return this builder
*/
public Builder setBlockCategory(BlockCategory blockCategory) {
public Builder withBlockCategory(BlockCategory blockCategory) {
this.blockCategory = blockCategory;
return this;
}
Expand All @@ -150,7 +150,7 @@ public Builder setBlockCategory(BlockCategory blockCategory) {
* @param source cache write source
* @return this builder
*/
public Builder setSource(CacheWriteSource source) {
public Builder withSource(CacheWriteSource source) {
this.source = source;
return this;
}
Expand Down
Loading
Loading