diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java index 4059d4be94aad..e29729b500637 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java @@ -48,6 +48,10 @@ public interface LedgerOffloaderStats extends AutoCloseable { void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit); + void recordOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit); + + void recordReadOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit); + void recordDeleteOffloadOps(String topic, boolean succeed); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java index eeac9cfcfa994..1df20df2350fa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java @@ -68,6 +68,16 @@ public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit un } + @Override + public void recordOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) { + + } + + @Override + public void recordReadOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) { + + } + @Override public void recordDeleteOffloadOps(String topic, boolean succeed) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java index 5e05e4c8137cd..2d82b7d113a5e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java @@ -58,6 +58,8 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run private final Gauge readOffloadRate; private final Summary readOffloadIndexLatency; private final Summary readOffloadDataLatency; + private final Summary offloadExecutorQueueLatency; + private final Summary readOffloadExecutorQueueLatency; private final Map topicAccess; private final Map> offloadAndReadOffloadBytesMap; @@ -105,6 +107,21 @@ private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics, .quantile(0.99, 0.01) .quantile(1, 0.01) .create().register(); + this.offloadExecutorQueueLatency = Summary.build("brk_ledgeroffloader_offload_executor_queue_latency", "-") + .labelNames(labels) + .quantile(0.50, 0.01) + .quantile(0.95, 0.01) + .quantile(0.99, 0.01) + .quantile(1, 0.01) + .create().register(); + this.readOffloadExecutorQueueLatency = + Summary.build("brk_ledgeroffloader_read_offload_executor_queue_latency", "-") + .labelNames(labels) + .quantile(0.50, 0.01) + .quantile(0.95, 0.01) + .quantile(0.99, 0.01) + .quantile(1, 0.01) + .create().register(); this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-") .labelNames(labels).quantile(0.50, 0.01) .quantile(0.95, 0.01) @@ -191,6 +208,20 @@ public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit un this.addOrUpdateTopicAccess(topic); } + @Override + public void recordOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) { + String[] labelValues = this.labelValues(topic); + this.offloadExecutorQueueLatency.labels(labelValues).observe(unit.toMicros(latency)); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordReadOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) { + String[] labelValues = this.labelValues(topic); + this.readOffloadExecutorQueueLatency.labels(labelValues).observe(unit.toMicros(latency)); + this.addOrUpdateTopicAccess(topic); + } + @Override public void recordDeleteOffloadOps(String topic, boolean succeed) { String status = succeed ? SUCCEED : FAILED; @@ -247,6 +278,8 @@ private void cleanExpiredTopicMetrics() { this.readOffloadRate.remove(labelValues); this.readOffloadIndexLatency.remove(labelValues); this.readOffloadDataLatency.remove(labelValues); + this.offloadExecutorQueueLatency.remove(labelValues); + this.readOffloadExecutorQueueLatency.remove(labelValues); labelValues = this.labelValues(topic, SUCCEED); this.deleteOffloadOps.remove(labelValues); @@ -287,6 +320,8 @@ public synchronized void close() throws Exception { CollectorRegistry.defaultRegistry.unregister(this.readOffloadRate); CollectorRegistry.defaultRegistry.unregister(this.readOffloadIndexLatency); CollectorRegistry.defaultRegistry.unregister(this.readOffloadDataLatency); + CollectorRegistry.defaultRegistry.unregister(this.offloadExecutorQueueLatency); + CollectorRegistry.defaultRegistry.unregister(this.readOffloadExecutorQueueLatency); CollectorRegistry.defaultRegistry.unregister(this.deleteOffloadOps); instance = null; } @@ -363,4 +398,16 @@ public Summary.Child.Value getReadOffloadDataLatency(String topic) { String[] labels = this.labelValues(topic); return this.readOffloadDataLatency.labels(labels).get(); } + + @VisibleForTesting + public Summary.Child.Value getOffloadExecutorQueueLatency(String topic) { + String[] labels = this.labelValues(topic); + return this.offloadExecutorQueueLatency.labels(labels).get(); + } + + @VisibleForTesting + public Summary.Child.Value getReadOffloadExecutorQueueLatency(String topic) { + String[] labels = this.labelValues(topic); + return this.readOffloadExecutorQueueLatency.labels(labels).get(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java index b2c0b1c1d5a68..912a644c0cbaf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java @@ -72,6 +72,8 @@ public void testTopicLevelMetrics() throws Exception { offloaderStats.recordReadOffloadError(topicName); offloaderStats.recordReadOffloadError(topicName); offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS); + offloaderStats.recordOffloadExecutorQueueLatency(topicName, 1000L, TimeUnit.NANOSECONDS); + offloaderStats.recordReadOffloadExecutorQueueLatency(topicName, 2000L, TimeUnit.NANOSECONDS); offloaderStats.recordReadOffloadBytes(topicName, 100000); offloaderStats.recordWriteToStorageError(topicName); offloaderStats.recordWriteToStorageError(topicName); @@ -83,6 +85,8 @@ public void testTopicLevelMetrics() throws Exception { assertEquals((long) offloaderStats.getReadLedgerLatency(topicName).sum, 1); assertEquals(offloaderStats.getReadOffloadError(topicName), 2); assertEquals((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum, 1000); + assertEquals((long) offloaderStats.getOffloadExecutorQueueLatency(topicName).sum, 1); + assertEquals((long) offloaderStats.getReadOffloadExecutorQueueLatency(topicName).sum, 2); assertEquals(offloaderStats.getReadOffloadBytes(topicName), 100000); assertEquals(offloaderStats.getWriteStorageError(topicName), 2); } @@ -116,6 +120,8 @@ public void testNamespaceLevelMetrics() throws Exception { offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS); offloaderStats.recordReadOffloadError(topicName); offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS); + offloaderStats.recordOffloadExecutorQueueLatency(topicName, 1000L, TimeUnit.NANOSECONDS); + offloaderStats.recordReadOffloadExecutorQueueLatency(topicName, 2000L, TimeUnit.NANOSECONDS); offloaderStats.recordReadOffloadBytes(topicName, 100000); offloaderStats.recordWriteToStorageError(topicName); } @@ -130,6 +136,8 @@ public void testNamespaceLevelMetrics() throws Exception { assertEquals((long) offloaderStats.getReadLedgerLatency(topicName).sum, 6); assertEquals(offloaderStats.getReadOffloadError(topicName), 6); assertEquals((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum, 6000); + assertEquals((long) offloaderStats.getOffloadExecutorQueueLatency(topicName).sum, 6); + assertEquals((long) offloaderStats.getReadOffloadExecutorQueueLatency(topicName).sum, 12); assertEquals(offloaderStats.getReadOffloadBytes(topicName), 600000); assertEquals(offloaderStats.getWriteStorageError(topicName), 6); } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index 77bcfa5cdd80f..7752a20171fa9 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -67,6 +67,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedge private final DataInputStream dataStream; private final ExecutorService executor; private final OffsetsCache entryOffsetsCache; + private final LedgerOffloaderStats offloaderStats; + private final String topicName; private final AtomicReference> closeFuture = new AtomicReference<>(); enum State { @@ -83,13 +85,17 @@ enum State { @VisibleForTesting BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, BackedInputStream inputStream, ExecutorService executor, - OffsetsCache entryOffsetsCache) { + OffsetsCache entryOffsetsCache, + LedgerOffloaderStats offloaderStats, + String topicName) { this.ledgerId = ledgerId; this.index = index; this.inputStream = inputStream; this.dataStream = new DataInputStream(inputStream); this.executor = executor; this.entryOffsetsCache = entryOffsetsCache; + this.offloaderStats = offloaderStats; + this.topicName = topicName; state = State.Opened; } @@ -110,7 +116,7 @@ public CompletableFuture closeAsync() { } CompletableFuture promise = closeFuture.get(); - executor.execute(() -> { + executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName, () -> { try { index.close(); inputStream.close(); @@ -119,7 +125,7 @@ public CompletableFuture closeAsync() { } catch (IOException t) { promise.completeExceptionally(t); } - }); + })); return promise; } @@ -328,7 +334,8 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr lastAccessTimestamp = System.currentTimeMillis(); PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this); }); - executor.execute(new ReadTask(firstEntry, lastEntry, promise)); + executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName, + new ReadTask(firstEntry, lastEntry, promise))); return promise; } @@ -434,7 +441,8 @@ public static ReadHandle open(ScheduledExecutorService executor, BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName); - return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache); + return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache, + offloaderStats, topicName); } // for testing diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index 1ac5047ba7198..8274eeed7d5c6 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -62,6 +62,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle { private final List inputStreams; private final List dataStreams; private final ExecutorService executor; + private final LedgerOffloaderStats offloaderStats; + private final String topicName; private volatile State state = null; private final AtomicReference> closeFuture = new AtomicReference<>(); @@ -101,7 +103,9 @@ public GroupedReader(long ledgerId, long firstEntry, long lastEntry, private BlobStoreBackedReadHandleImplV2(long ledgerId, List indices, List inputStreams, - ExecutorService executor) { + ExecutorService executor, + LedgerOffloaderStats offloaderStats, + String topicName) { this.ledgerId = ledgerId; this.indices = indices; this.inputStreams = inputStreams; @@ -110,6 +114,8 @@ private BlobStoreBackedReadHandleImplV2(long ledgerId, List dataStreams.add(new DataInputStream(inputStream)); } this.executor = executor; + this.offloaderStats = offloaderStats; + this.topicName = topicName; this.state = State.Opened; } @@ -131,7 +137,7 @@ public CompletableFuture closeAsync() { } CompletableFuture promise = closeFuture.get(); - executor.execute(() -> { + executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName, () -> { try { IOException first = null; for (OffloadIndexBlockV2 indexBlock : indices) { @@ -166,7 +172,7 @@ public CompletableFuture closeAsync() { state = State.Closed; promise.completeExceptionally(t); } - }); + })); return promise; } @@ -176,7 +182,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry); } CompletableFuture promise = new CompletableFuture<>(); - executor.execute(() -> { + executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName, () -> { if (state == State.Closed) { log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry); @@ -257,7 +263,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } promise.complete(LedgerEntriesImpl.create(entries)); - }); + })); return promise; } @@ -359,6 +365,7 @@ public static ReadHandle open(ScheduledExecutorService executor, inputStreams.add(inputStream); indice.add(index); } - return new BlobStoreBackedReadHandleImplV2(ledgerId, indice, inputStreams, executor); + return new BlobStoreBackedReadHandleImplV2(ledgerId, indice, inputStreams, executor, offloaderStats, + topicName); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 33bbc49ee2223..c79e0f00f1d55 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -179,6 +179,12 @@ private BlobStore getBlobStore(BlobStoreLocation blobStoreLocation) { }); } + private String toTopicName(String managedLedgerName) { + return Strings.isNullOrEmpty(managedLedgerName) + ? null + : TopicName.fromPersistenceNamingEncoding(managedLedgerName); + } + @Override public String getOffloadDriverName() { return config.getDriver(); @@ -198,9 +204,10 @@ public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) { final String managedLedgerName = extraMetadata.get(MANAGED_LEDGER_NAME); - final String topicName = TopicName.fromPersistenceNamingEncoding(managedLedgerName); + final String topicName = toTopicName(managedLedgerName); CompletableFuture promise = new CompletableFuture<>(); - scheduler.chooseThread(readHandle.getId()).execute(() -> { + scheduler.chooseThread(readHandle.getId()).execute( + ExecutorLatencyUtils.trackOffloadExecutorQueueLatency(this.offloaderStats, topicName, () -> { final BlobStore writeBlobStore = getBlobStore(config.getBlobStoreLocation()); log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata, config.getBlobStoreLocation(), writeBlobStore); @@ -330,7 +337,7 @@ public CompletableFuture offload(ReadHandle readHandle, promise.completeExceptionally(t); return; } - }); + })); return promise; } @@ -365,10 +372,12 @@ public CompletableFuture streamingOffload(@NonNull ManagedLedger streamingMpu = blobStore .initiateMultipartUpload(config.getBucket(), blob.getMetadata(), new PutOptions()); - scheduler.chooseThread(segmentInfo).execute(() -> { - log.info("start offloading segment: {}", segmentInfo); - streamingOffloadLoop(1, 0); - }); + String topicName = toTopicName(ml.getName()); + scheduler.chooseThread(segmentInfo).execute( + ExecutorLatencyUtils.trackOffloadExecutorQueueLatency(this.offloaderStats, topicName, () -> { + log.info("start offloading segment: {}", segmentInfo); + streamingOffloadLoop(1, 0); + })); scheduler.schedule(this::closeSegment, maxSegmentCloseTime.toMillis(), TimeUnit.MILLISECONDS); return CompletableFuture.completedFuture(new OffloadHandle() { @@ -565,7 +574,9 @@ public CompletableFuture readOffloaded(long ledgerId, UUID uid, CompletableFuture promise = new CompletableFuture<>(); String key = DataBlockUtils.dataBlockOffloadKey(ledgerId, uid); String indexKey = DataBlockUtils.indexBlockOffloadKey(ledgerId, uid); - readExecutor.chooseThread(ledgerId).execute(() -> { + String topicName = toTopicName(offloadDriverMetadata.get(MANAGED_LEDGER_NAME)); + readExecutor.chooseThread(ledgerId).execute( + ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(this.offloaderStats, topicName, () -> { try { BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); promise.complete(BlobStoreBackedReadHandleImpl.open(readExecutor.chooseThread(ledgerId), @@ -579,7 +590,7 @@ public CompletableFuture readOffloaded(long ledgerId, UUID uid, log.error("Failed readOffloaded: ", t); promise.completeExceptionally(t); } - }); + })); return promise; } @@ -600,7 +611,9 @@ public CompletableFuture readOffloaded(long ledgerId, MLDataFormats. indexKeys.add(indexKey); }); - readExecutor.chooseThread(ledgerId).execute(() -> { + String topicName = toTopicName(offloadDriverMetadata.get(MANAGED_LEDGER_NAME)); + readExecutor.chooseThread(ledgerId).execute( + ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(this.offloaderStats, topicName, () -> { try { BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); promise.complete(BlobStoreBackedReadHandleImplV2.open(readExecutor.chooseThread(ledgerId), @@ -613,7 +626,7 @@ public CompletableFuture readOffloaded(long ledgerId, MLDataFormats. log.error("Failed readOffloaded: ", t); promise.completeExceptionally(t); } - }); + })); return promise; } @@ -624,7 +637,9 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, String readBucket = bsKey.getBucket(offloadDriverMetadata); CompletableFuture promise = new CompletableFuture<>(); - scheduler.chooseThread(ledgerId).execute(() -> { + String topicName = this.ml == null ? null : toTopicName(this.ml.getName()); + scheduler.chooseThread(ledgerId).execute( + ExecutorLatencyUtils.trackOffloadExecutorQueueLatency(this.offloaderStats, topicName, () -> { try { BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); readBlobstore.removeBlobs(readBucket, @@ -635,7 +650,7 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, log.error("Failed delete Blob", t); promise.completeExceptionally(t); } - }); + })); return promise.whenComplete((__, t) -> { if (null != this.ml) { @@ -651,7 +666,8 @@ public CompletableFuture deleteOffloaded(UUID uid, Map off String readBucket = bsKey.getBucket(offloadDriverMetadata); CompletableFuture promise = new CompletableFuture<>(); - scheduler.execute(() -> { + String topicName = this.ml == null ? null : toTopicName(this.ml.getName()); + scheduler.execute(ExecutorLatencyUtils.trackOffloadExecutorQueueLatency(this.offloaderStats, topicName, () -> { try { BlobStore readBlobstore = getBlobStore(config.getBlobStoreLocation()); readBlobstore.removeBlobs(readBucket, @@ -662,7 +678,7 @@ public CompletableFuture deleteOffloaded(UUID uid, Map off log.error("Failed delete Blob", t); promise.completeExceptionally(t); } - }); + })); return promise.whenComplete((__, t) -> this.offloaderStats.recordDeleteOffloadOps( diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/ExecutorLatencyUtils.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/ExecutorLatencyUtils.java new file mode 100644 index 0000000000000..e9934c691aef6 --- /dev/null +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/ExecutorLatencyUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload.jcloud.impl; + +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; + +final class ExecutorLatencyUtils { + + private ExecutorLatencyUtils() { + } + + static Runnable trackOffloadExecutorQueueLatency(LedgerOffloaderStats offloaderStats, + String topicName, + Runnable task) { + return trackQueueLatency(offloaderStats, topicName, task, false); + } + + static Runnable trackReadOffloadExecutorQueueLatency(LedgerOffloaderStats offloaderStats, + String topicName, + Runnable task) { + return trackQueueLatency(offloaderStats, topicName, task, true); + } + + private static Runnable trackQueueLatency(LedgerOffloaderStats offloaderStats, + String topicName, + Runnable task, + boolean readTask) { + final long enqueueTimeNanos = System.nanoTime(); + return () -> { + long queuedLatencyNanos = System.nanoTime() - enqueueTimeNanos; + if (readTask) { + offloaderStats.recordReadOffloadExecutorQueueLatency(topicName, queuedLatencyNanos, + TimeUnit.NANOSECONDS); + } else { + offloaderStats.recordOffloadExecutorQueueLatency(topicName, queuedLatencyNanos, + TimeUnit.NANOSECONDS); + } + task.run(); + }; + } +} diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java index bf116e5aeca58..0c173824f2a44 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplTest.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.net.BookieId; @@ -115,7 +116,8 @@ private Pair createReadHandle( OffloadIndexEntryImpl.of(pair.getLeft(), 0, pair.getRight(), 0)); } // Build obj. - return Pair.of(new BlobStoreBackedReadHandleImpl(ledgerId, mockIndex, inputStream, executor, offsetsCache), + return Pair.of(new BlobStoreBackedReadHandleImpl(ledgerId, mockIndex, inputStream, executor, offsetsCache, + LedgerOffloaderStatsDisable.INSTANCE, null), data); } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index cd97954b97739..9cc9be4c19015 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -231,6 +231,8 @@ public void testOffloadAndReadMetrics() throws Exception { assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0); assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0); assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0); + assertTrue(offloaderStats.getOffloadExecutorQueueLatency(topic).count > 0); + assertTrue(offloaderStats.getReadOffloadExecutorQueueLatency(topic).count > 0); } @Test