Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9935b28
Add support for multi-tenancy benchmark test + client telemetry close
Feb 23, 2026
39819b1
update
Feb 24, 2026
65485db
Wire connectionSharingAcrossClientsEnabled through benchmark harness
Feb 25, 2026
a4027e9
Update BENCHMARK_RESULTS.md with deep investigation findings
Feb 25, 2026
dc53947
Enable Reactor Netty connection pool metrics via SimpleMeterRegistry
Feb 25, 2026
fae26c3
Fix: register SimpleMeterRegistry on globalRegistry before client cre…
Feb 25, 2026
2068219
Add per-endpoint pool names for connection pool metrics
Feb 25, 2026
3ac4e4e
Add pool id tag and full tag dump to pool metrics logging
Feb 25, 2026
06eb58c
Wire App Insights MeterRegistry into Metrics.globalRegistry for pool …
Feb 25, 2026
26da910
Add NettyHttpMetricsReporter for clean pool metrics CSV export
Feb 25, 2026
d0ed2a6
Make Netty HTTP client metrics opt-in via system property
Feb 25, 2026
c8c251b
Wire http2Enabled through benchmark harness (field, getter, setter, a…
Feb 25, 2026
928ede4
Fix: wire NettyHttpMetricsReporter into BenchmarkOrchestrator lifecycle
Feb 25, 2026
585dddf
Clear COSMOS.NETTY_HTTP_CLIENT_METRICS_ENABLED system property on shu…
Feb 25, 2026
908bd0a
Fix: add SimpleMeterRegistry as backing store for Reactor Netty pool …
Feb 26, 2026
f50cc59
Fix: pre-population concurrency uses configured value instead of hard…
Feb 26, 2026
0c7605f
merge from main and resolve conflicts
Feb 26, 2026
b08565c
merge and resolve conflicts
Feb 26, 2026
c11f7bf
delete unrelated files
Feb 26, 2026
1b0cc05
refactor
Feb 26, 2026
989f6fc
gitignore: exclude benchmark docs/scripts and copilot agents/skills
Feb 26, 2026
e81e6f9
remove local md files
Feb 26, 2026
db641b5
remove local md files
Feb 26, 2026
5cb41e8
Merge branch 'wireConnectionSharingInBenchmark' of https://github.com…
Feb 26, 2026
9f7c73d
Simplify benchmark .gitignore: use docs/ instead of individual file e…
Feb 26, 2026
af967bf
Merge branch 'wireConnectionSharingInBenchmark' of https://github.com…
Feb 26, 2026
1345869
delete
Feb 26, 2026
922835a
Fix: per-tenant Dropwizard meter names to avoid HdrHistogram contention
Feb 27, 2026
903a5a4
Revert "Fix: per-tenant Dropwizard meter names to avoid HdrHistogram …
Feb 27, 2026
76e389e
Address PR review comments
Feb 27, 2026
705b84b
Add enableNettyHttpMetrics to EXCLUDED_FIELDS allowlist
Feb 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ abstract class AsyncBenchmark<T> {
.preferredRegions(cfg.getPreferredRegionsList())
.consistencyLevel(cfg.getConsistencyLevel())
.userAgentSuffix(cfg.getApplicationName())
.contentResponseOnWriteEnabled(cfg.isContentResponseOnWriteEnabled());
.contentResponseOnWriteEnabled(cfg.isContentResponseOnWriteEnabled())
.connectionSharingAcrossClientsEnabled(cfg.isConnectionSharingAcrossClientsEnabled());

clientBuilderAccessor
.setRegionScopedSessionCapturingEnabled(benchmarkSpecificClientBuilder, cfg.isRegionScopedSessionContainerEnabled());
Expand Down Expand Up @@ -252,7 +253,14 @@ uuid, new PartitionKey(partitionKey), PojoizedJson.class)
}
}

docsToRead = Flux.merge(Flux.fromIterable(createDocumentObservables), 100).collectList().block();
if (createDocumentObservables.isEmpty()) {
docsToRead = new ArrayList<>();
} else {
int prePopConcurrency = Math.max(1, Math.min(cfg.getConcurrency(), 100));
docsToRead = Flux.merge(Flux.fromIterable(createDocumentObservables), prePopConcurrency)
.collectList()
.block();
}
logger.info("Finished pre-populating {} documents", cfg.getNumberOfPreCreatedDocuments());

init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class BenchmarkConfig {
private boolean suppressCleanup = false;
private boolean gcBetweenCycles = true;
private boolean enableJvmStats = false;
private boolean enableNettyHttpMetrics = false;

// -- Reporting --
private String reportingDirectory;
Expand Down Expand Up @@ -81,6 +82,7 @@ public static BenchmarkConfig fromConfiguration(Configuration cfg) throws IOExce

config.gcBetweenCycles = cfg.isGcBetweenCycles();
config.enableJvmStats = cfg.isEnableJvmStats();
config.enableNettyHttpMetrics = cfg.isEnableNettyHttpMetrics();

// Reporting
config.reportingDirectory = cfg.getReportingDirectory() != null
Expand Down Expand Up @@ -127,6 +129,7 @@ public static BenchmarkConfig fromConfiguration(Configuration cfg) throws IOExce
public boolean isSuppressCleanup() { return suppressCleanup; }
public boolean isGcBetweenCycles() { return gcBetweenCycles; }
public boolean isEnableJvmStats() { return enableJvmStats; }
public boolean isEnableNettyHttpMetrics() { return enableNettyHttpMetrics; }

public String getReportingDirectory() { return reportingDirectory; }
public int getPrintingInterval() { return printingInterval; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -123,24 +126,46 @@ public void run(BenchmarkConfig config) throws Exception {
config.getResultUploadDatabase(), config.getResultUploadContainer());
}

// Netty HTTP connection pool metrics reporter (only when enabled)
NettyHttpMetricsReporter nettyMetricsReporter = null;
SimpleMeterRegistry nettyHttpMeterRegistry = null;
// Add a SimpleMeterRegistry to globalRegistry when netty metrics are enabled,
if (config.isEnableNettyHttpMetrics() && config.getReportingDirectory() != null) {
nettyHttpMeterRegistry = new SimpleMeterRegistry();
Metrics.addRegistry(nettyHttpMeterRegistry);
logger.info("SimpleMeterRegistry added to globalRegistry for Reactor Netty pool gauge backing");

Path nettyMetricsDir = Paths.get(config.getReportingDirectory());
nettyMetricsReporter = new NettyHttpMetricsReporter(nettyHttpMeterRegistry, nettyMetricsDir);
nettyMetricsReporter.start(config.getPrintingInterval(), TimeUnit.SECONDS);
}
Comment thread
xinlian12 marked this conversation as resolved.

reporter.report();
logger.info("[LIFECYCLE] PRE_CREATE timestamp={}", Instant.now());
logger.info("BenchmarkConfig: {}", config);

// ======== Lifecycle loop ========
runLifecycleLoop(config, registry, reporter);

// Cleanup reporters
reporter.report();
reporter.stop();
if (resultReporter != null) {
resultReporter.report();
resultReporter.stop();
}
if (resultUploaderClient != null) {
resultUploaderClient.close();
try {
runLifecycleLoop(config, registry, reporter);
} finally {
// Cleanup reporters
reporter.report();
reporter.stop();
if (resultReporter != null) {
resultReporter.report();
resultReporter.stop();
}
if (resultUploaderClient != null) {
resultUploaderClient.close();
}
if (nettyMetricsReporter != null) {
nettyMetricsReporter.stop();
}
if (nettyHttpMeterRegistry != null) {
Metrics.removeRegistry(nettyHttpMeterRegistry);
}
clearGlobalSystemProperties();
}
clearGlobalSystemProperties();
}

// ======== Lifecycle loop (create -> run -> close -> settle x N) ========
Expand Down Expand Up @@ -348,6 +373,7 @@ private void clearGlobalSystemProperties() {
System.clearProperty("COSMOS.E2E_TIMEOUT_ERROR_HIT_THRESHOLD_FOR_PPAF");
System.clearProperty("COSMOS.E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF");
System.clearProperty("COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT");
System.clearProperty("COSMOS.NETTY_HTTP_CLIENT_METRICS_ENABLED");
}

private void setGlobalSystemProperties(BenchmarkConfig config) {
Expand All @@ -373,6 +399,11 @@ private void setGlobalSystemProperties(BenchmarkConfig config) {
String.valueOf(config.getMinConnectionPoolSizePerEndpoint()));
}

if (config.isEnableNettyHttpMetrics()) {
System.setProperty("COSMOS.NETTY_HTTP_CLIENT_METRICS_ENABLED", "true");
logger.info("Reactor Netty HTTP connection pool metrics enabled");
}

logger.info("Global system properties set (circuit breaker: {}, PPAF: {}, minConnPoolSize: {})",
config.isPartitionLevelCircuitBreakerEnabled(),
config.isPerPartitionAutomaticFailoverRequired(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class Configuration {
@Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size")
private Integer maxConnectionPoolSize = 1000;

@Parameter(names = "-connectionSharingAcrossClientsEnabled", description = "Enable connection sharing across CosmosClient instances (Gateway mode). Reduces connection count for multi-tenant scenarios.")
private boolean connectionSharingAcrossClientsEnabled = false;

@Parameter(names = "-diagnosticsThresholdDuration", description = "Latency threshold for printing diagnostics", converter = DurationConverter.class)
private Duration diagnosticsThresholdDuration = Duration.ofSeconds(60);

Expand All @@ -92,6 +95,9 @@ public class Configuration {
@Parameter(names = "-enableJvmStats", description = "Enables JVM Stats")
private boolean enableJvmStats;

@Parameter(names = "-enableNettyHttpMetrics", description = "Enables Reactor Netty HTTP client metrics (connection pool gauges via COSMOS.NETTY_HTTP_CLIENT_METRICS_ENABLED)")
private boolean enableNettyHttpMetrics;

@Parameter(names = "-throughput", description = "provisioned throughput for test container")
private int throughput = 100000;

Expand Down Expand Up @@ -488,6 +494,10 @@ public Integer getMaxConnectionPoolSize() {
return maxConnectionPoolSize;
}

public boolean isConnectionSharingAcrossClientsEnabled() {
return connectionSharingAcrossClientsEnabled;
}

public ConnectionMode getConnectionMode() {
return connectionMode;
}
Expand Down Expand Up @@ -540,6 +550,10 @@ public boolean isEnableJvmStats() {
return enableJvmStats;
}

public boolean isEnableNettyHttpMetrics() {
return enableNettyHttpMetrics;
}

public MeterRegistry getAzureMonitorMeterRegistry() {
String instrumentationKey = System.getProperty("azure.cosmos.monitoring.azureMonitor.instrumentationKey",
StringUtils.defaultString(Strings.emptyToNull(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.benchmark;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Periodically samples Reactor Netty connection pool metrics from a Micrometer
* {@link MeterRegistry} and writes them to a CSV file.
*
* <p>Metrics captured (when {@code ConnectionProvider.metrics(true)} is enabled):</p>
* <ul>
* <li>{@code reactor.netty.connection.provider.total.connections}</li>
* <li>{@code reactor.netty.connection.provider.active.connections}</li>
* <li>{@code reactor.netty.connection.provider.idle.connections}</li>
* <li>{@code reactor.netty.connection.provider.pending.connections}</li>
* <li>{@code reactor.netty.connection.provider.max.connections}</li>
* <li>{@code reactor.netty.connection.provider.max.pending.connections}</li>
* </ul>
*
* <p>CSV columns: timestamp, metric, pool_id, pool_name, remote_address, value</p>
*/
public class NettyHttpMetricsReporter {
Comment thread
xinlian12 marked this conversation as resolved.

private static final Logger logger = LoggerFactory.getLogger(NettyHttpMetricsReporter.class);
private static final String METRIC_PREFIX = "reactor.netty.connection.provider";
private static final String CSV_HEADER = "timestamp,metric,pool_id,pool_name,remote_address,value";

private final MeterRegistry registry;
private final Path outputFile;
private final ScheduledExecutorService scheduler;
private BufferedWriter writer;

/**
* @param registry the Micrometer registry to query (typically {@code Metrics.globalRegistry})
* @param outputDir directory to write the CSV file into
*/
public NettyHttpMetricsReporter(MeterRegistry registry, Path outputDir) {
this.registry = registry;
this.outputFile = outputDir.resolve("netty-pool-metrics.csv");
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "netty-metrics-reporter");
t.setDaemon(true);
return t;
});
}

/**
* Start periodic reporting.
*
* @param interval reporting interval
* @param unit time unit
*/
public void start(long interval, TimeUnit unit) {
try {
Files.createDirectories(outputFile.getParent());
writer = Files.newBufferedWriter(outputFile,
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
writer.write(CSV_HEADER);
writer.newLine();
writer.flush();
} catch (IOException e) {
logger.error("Failed to create netty pool metrics CSV: {}", outputFile, e);
return;
}

scheduler.scheduleAtFixedRate(this::report, interval, interval, unit);
logger.info("NettyHttpMetricsReporter started -> {} (every {}s)", outputFile, unit.toSeconds(interval));
}

/**
* Write a single snapshot of all pool metrics to CSV.
*/
public void report() {
if (writer == null) return;

String timestamp = Instant.now().toString();
int count = 0;

try {
for (Meter meter : registry.getMeters()) {
String name = meter.getId().getName();
if (!name.startsWith(METRIC_PREFIX)) continue;

// Only report gauge-type metrics (connections counts)
if (!(meter instanceof Gauge)) continue;

double value = ((Gauge) meter).value();
String poolId = meter.getId().getTag("id");
String poolName = meter.getId().getTag("name");
String remoteAddr = meter.getId().getTag("remote.address");

// Strip the common prefix for shorter metric names in CSV
String shortName = name.substring(METRIC_PREFIX.length() + 1);

writer.write(String.format("%s,%s,%s,%s,%s,%.0f",
timestamp, shortName,
poolId != null ? poolId : "",
poolName != null ? poolName : "",
remoteAddr != null ? remoteAddr : "",
value));
writer.newLine();
count++;
}
writer.flush();
} catch (IOException e) {
logger.warn("Failed to write netty pool metrics", e);
}

if (count > 0) {
logger.debug("NettyHttpMetricsReporter: wrote {} pool metrics", count);
}
}

/**
* Stop the reporter and close the CSV file.
*/
public void stop() {
scheduler.shutdown();
try {
scheduler.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

// Final snapshot
report();

if (writer != null) {
try {
writer.close();
logger.info("NettyHttpMetricsReporter stopped. Output: {}", outputFile);
} catch (IOException e) {
logger.warn("Failed to close netty pool metrics CSV", e);
}
}
}
}
Loading
Loading