Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public interface LedgerOffloaderStats extends AutoCloseable {

void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit);

void recordOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit);

void recordReadOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit);

void recordDeleteOffloadOps(String topic, boolean succeed);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit un

}

@Override
public void recordOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) {

}

@Override
public void recordReadOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) {

}

@Override
public void recordDeleteOffloadOps(String topic, boolean succeed) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
private final Gauge readOffloadRate;
private final Summary readOffloadIndexLatency;
private final Summary readOffloadDataLatency;
private final Summary offloadExecutorQueueLatency;
private final Summary readOffloadExecutorQueueLatency;

private final Map<String, Long> topicAccess;
private final Map<String, Pair<LongAdder, LongAdder>> offloadAndReadOffloadBytesMap;
Expand Down Expand Up @@ -105,6 +107,21 @@ private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics,
.quantile(0.99, 0.01)
.quantile(1, 0.01)
.create().register();
this.offloadExecutorQueueLatency = Summary.build("brk_ledgeroffloader_offload_executor_queue_latency", "-")
.labelNames(labels)
.quantile(0.50, 0.01)
.quantile(0.95, 0.01)
.quantile(0.99, 0.01)
.quantile(1, 0.01)
.create().register();
this.readOffloadExecutorQueueLatency =
Summary.build("brk_ledgeroffloader_read_offload_executor_queue_latency", "-")
.labelNames(labels)
.quantile(0.50, 0.01)
.quantile(0.95, 0.01)
.quantile(0.99, 0.01)
.quantile(1, 0.01)
.create().register();
this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-")
.labelNames(labels).quantile(0.50, 0.01)
.quantile(0.95, 0.01)
Expand Down Expand Up @@ -191,6 +208,20 @@ public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit un
this.addOrUpdateTopicAccess(topic);
}

@Override
public void recordOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) {
String[] labelValues = this.labelValues(topic);
this.offloadExecutorQueueLatency.labels(labelValues).observe(unit.toMicros(latency));
this.addOrUpdateTopicAccess(topic);
}

@Override
public void recordReadOffloadExecutorQueueLatency(String topic, long latency, TimeUnit unit) {
String[] labelValues = this.labelValues(topic);
this.readOffloadExecutorQueueLatency.labels(labelValues).observe(unit.toMicros(latency));
this.addOrUpdateTopicAccess(topic);
}

@Override
public void recordDeleteOffloadOps(String topic, boolean succeed) {
String status = succeed ? SUCCEED : FAILED;
Expand Down Expand Up @@ -247,6 +278,8 @@ private void cleanExpiredTopicMetrics() {
this.readOffloadRate.remove(labelValues);
this.readOffloadIndexLatency.remove(labelValues);
this.readOffloadDataLatency.remove(labelValues);
this.offloadExecutorQueueLatency.remove(labelValues);
this.readOffloadExecutorQueueLatency.remove(labelValues);

labelValues = this.labelValues(topic, SUCCEED);
this.deleteOffloadOps.remove(labelValues);
Expand Down Expand Up @@ -287,6 +320,8 @@ public synchronized void close() throws Exception {
CollectorRegistry.defaultRegistry.unregister(this.readOffloadRate);
CollectorRegistry.defaultRegistry.unregister(this.readOffloadIndexLatency);
CollectorRegistry.defaultRegistry.unregister(this.readOffloadDataLatency);
CollectorRegistry.defaultRegistry.unregister(this.offloadExecutorQueueLatency);
CollectorRegistry.defaultRegistry.unregister(this.readOffloadExecutorQueueLatency);
CollectorRegistry.defaultRegistry.unregister(this.deleteOffloadOps);
instance = null;
}
Expand Down Expand Up @@ -363,4 +398,16 @@ public Summary.Child.Value getReadOffloadDataLatency(String topic) {
String[] labels = this.labelValues(topic);
return this.readOffloadDataLatency.labels(labels).get();
}

@VisibleForTesting
public Summary.Child.Value getOffloadExecutorQueueLatency(String topic) {
String[] labels = this.labelValues(topic);
return this.offloadExecutorQueueLatency.labels(labels).get();
}

@VisibleForTesting
public Summary.Child.Value getReadOffloadExecutorQueueLatency(String topic) {
String[] labels = this.labelValues(topic);
return this.readOffloadExecutorQueueLatency.labels(labels).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public void testTopicLevelMetrics() throws Exception {
offloaderStats.recordReadOffloadError(topicName);
offloaderStats.recordReadOffloadError(topicName);
offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS);
offloaderStats.recordOffloadExecutorQueueLatency(topicName, 1000L, TimeUnit.NANOSECONDS);
offloaderStats.recordReadOffloadExecutorQueueLatency(topicName, 2000L, TimeUnit.NANOSECONDS);
offloaderStats.recordReadOffloadBytes(topicName, 100000);
offloaderStats.recordWriteToStorageError(topicName);
offloaderStats.recordWriteToStorageError(topicName);
Expand All @@ -83,6 +85,8 @@ public void testTopicLevelMetrics() throws Exception {
assertEquals((long) offloaderStats.getReadLedgerLatency(topicName).sum, 1);
assertEquals(offloaderStats.getReadOffloadError(topicName), 2);
assertEquals((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum, 1000);
assertEquals((long) offloaderStats.getOffloadExecutorQueueLatency(topicName).sum, 1);
assertEquals((long) offloaderStats.getReadOffloadExecutorQueueLatency(topicName).sum, 2);
assertEquals(offloaderStats.getReadOffloadBytes(topicName), 100000);
assertEquals(offloaderStats.getWriteStorageError(topicName), 2);
}
Expand Down Expand Up @@ -116,6 +120,8 @@ public void testNamespaceLevelMetrics() throws Exception {
offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS);
offloaderStats.recordReadOffloadError(topicName);
offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS);
offloaderStats.recordOffloadExecutorQueueLatency(topicName, 1000L, TimeUnit.NANOSECONDS);
offloaderStats.recordReadOffloadExecutorQueueLatency(topicName, 2000L, TimeUnit.NANOSECONDS);
offloaderStats.recordReadOffloadBytes(topicName, 100000);
offloaderStats.recordWriteToStorageError(topicName);
}
Expand All @@ -130,6 +136,8 @@ public void testNamespaceLevelMetrics() throws Exception {
assertEquals((long) offloaderStats.getReadLedgerLatency(topicName).sum, 6);
assertEquals(offloaderStats.getReadOffloadError(topicName), 6);
assertEquals((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum, 6000);
assertEquals((long) offloaderStats.getOffloadExecutorQueueLatency(topicName).sum, 6);
assertEquals((long) offloaderStats.getReadOffloadExecutorQueueLatency(topicName).sum, 12);
assertEquals(offloaderStats.getReadOffloadBytes(topicName), 600000);
assertEquals(offloaderStats.getWriteStorageError(topicName), 6);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle, OffloadedLedge
private final DataInputStream dataStream;
private final ExecutorService executor;
private final OffsetsCache entryOffsetsCache;
private final LedgerOffloaderStats offloaderStats;
private final String topicName;
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

enum State {
Expand All @@ -83,13 +85,17 @@ enum State {
@VisibleForTesting
BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream, ExecutorService executor,
OffsetsCache entryOffsetsCache) {
OffsetsCache entryOffsetsCache,
LedgerOffloaderStats offloaderStats,
String topicName) {
this.ledgerId = ledgerId;
this.index = index;
this.inputStream = inputStream;
this.dataStream = new DataInputStream(inputStream);
this.executor = executor;
this.entryOffsetsCache = entryOffsetsCache;
this.offloaderStats = offloaderStats;
this.topicName = topicName;
state = State.Opened;
}

Expand All @@ -110,7 +116,7 @@ public CompletableFuture<Void> closeAsync() {
}

CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName, () -> {
try {
index.close();
inputStream.close();
Expand All @@ -119,7 +125,7 @@ public CompletableFuture<Void> closeAsync() {
} catch (IOException t) {
promise.completeExceptionally(t);
}
});
}));
return promise;
}

Expand Down Expand Up @@ -328,7 +334,8 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
lastAccessTimestamp = System.currentTimeMillis();
PENDING_READ_UPDATER.decrementAndGet(BlobStoreBackedReadHandleImpl.this);
});
executor.execute(new ReadTask(firstEntry, lastEntry, promise));
executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName,
new ReadTask(firstEntry, lastEntry, promise)));
return promise;
}

Expand Down Expand Up @@ -434,7 +441,8 @@ public static ReadHandle open(ScheduledExecutorService executor,
BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);

return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache);
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache,
offloaderStats, topicName);
}

// for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
private final List<BackedInputStream> inputStreams;
private final List<DataInputStream> dataStreams;
private final ExecutorService executor;
private final LedgerOffloaderStats offloaderStats;
private final String topicName;
private volatile State state = null;
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

Expand Down Expand Up @@ -101,7 +103,9 @@ public GroupedReader(long ledgerId, long firstEntry, long lastEntry,

private BlobStoreBackedReadHandleImplV2(long ledgerId, List<OffloadIndexBlockV2> indices,
List<BackedInputStream> inputStreams,
ExecutorService executor) {
ExecutorService executor,
LedgerOffloaderStats offloaderStats,
String topicName) {
this.ledgerId = ledgerId;
this.indices = indices;
this.inputStreams = inputStreams;
Expand All @@ -110,6 +114,8 @@ private BlobStoreBackedReadHandleImplV2(long ledgerId, List<OffloadIndexBlockV2>
dataStreams.add(new DataInputStream(inputStream));
}
this.executor = executor;
this.offloaderStats = offloaderStats;
this.topicName = topicName;
this.state = State.Opened;
}

Expand All @@ -131,7 +137,7 @@ public CompletableFuture<Void> closeAsync() {
}

CompletableFuture<Void> promise = closeFuture.get();
executor.execute(() -> {
executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName, () -> {
try {
IOException first = null;
for (OffloadIndexBlockV2 indexBlock : indices) {
Expand Down Expand Up @@ -166,7 +172,7 @@ public CompletableFuture<Void> closeAsync() {
state = State.Closed;
promise.completeExceptionally(t);
}
});
}));
return promise;
}

Expand All @@ -176,7 +182,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.execute(() -> {
executor.execute(ExecutorLatencyUtils.trackReadOffloadExecutorQueueLatency(offloaderStats, topicName, () -> {
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
ledgerId, firstEntry, lastEntry);
Expand Down Expand Up @@ -257,7 +263,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr

}
promise.complete(LedgerEntriesImpl.create(entries));
});
}));
return promise;
}

Expand Down Expand Up @@ -359,6 +365,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
inputStreams.add(inputStream);
indice.add(index);
}
return new BlobStoreBackedReadHandleImplV2(ledgerId, indice, inputStreams, executor);
return new BlobStoreBackedReadHandleImplV2(ledgerId, indice, inputStreams, executor, offloaderStats,
topicName);
}
}
Loading
Loading