Skip to content

Commit ebda65c

Browse files
authored
Add weighted handler support to BatchQueue adaptive partitioning (#13801)
`PartitionPolicy.resolve()` now accepts a weighted handler count instead of raw handler count. `BatchQueue.addHandler(type, handler, weight)` overload allows callers to specify partition weight per handler type. **L1 (MetricsAggregateWorker):** MAL metrics use weight 0.05 (vs 1.0 for OAL). Rationale: MAL emits ~500 items/type per scrape interval. With 20,000-slot buffers, ~40 MAL types can safely share one partition (20,000 / 500 = 40). Weight 0.05 ≈ 1/20 gives 2x headroom. **L2 (MetricsPersistentMinWorker):** No weight differentiation. After L1 pre-aggregation, both OAL and MAL have similar per-minute burst patterns. **Impact (8-core, 642 OAL + 1,247 MAL types):** | | L1 Before | L1 After | Reduction | |---|---|---|---| | Partitions | 1,045 | 452 | 57% | | Array overhead | 167 MB | 72 MB | 57% |
1 parent 33ad987 commit ebda65c

7 files changed

Lines changed: 115 additions & 21 deletions

File tree

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* Fix missing `and` keyword in `JDBCEBPFProfilingTaskDAO.getTaskRecord()` SQL query, which caused a syntax error on every invocation.
1313
* Fix duplicate `TABLE_COLUMN` condition in `JDBCMetadataQueryDAO.findEndpoint()`, which was binding the same parameter twice due to a copy-paste error.
1414
* Support MCP (Model Context Protocol) observability for Envoy AI Gateway: MCP metrics (request CPM/latency, method breakdown, backend breakdown, initialization latency, capabilities), MCP access log sampling (errors only), `ai_route_type` searchable log tag, and MCP dashboard tabs.
15+
* Add weighted handler support to `BatchQueue` adaptive partitioning. MAL metrics use weight 0.05 at L1 (vs 1.0 for OAL), reducing partition count and memory overhead when many MAL metric types are registered.
1516

1617
#### UI
1718

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import lombok.extern.slf4j.Slf4j;
2525
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
26+
import org.apache.skywalking.oap.server.core.analysis.meter.Meter;
2627
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
2728
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
2829
import org.apache.skywalking.oap.server.library.batchqueue.BatchQueue;
@@ -121,7 +122,16 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
121122
QUEUE_USAGE_GAUGE = gauge;
122123
}
123124

124-
l1Queue.addHandler(metricsClass, new L1Handler());
125+
// OAL metrics receive items on every incoming request (high, continuous throughput),
126+
// so each type benefits from a dedicated partition — weight 1.0.
127+
// MAL metrics receive items only once per scrape interval (typically 1 emit/min),
128+
// producing at most ~500 items per type per burst. With a 20,000-slot buffer,
129+
// ~40 MAL types can safely share one partition (20,000 / 500 = 40). We use
130+
// weight 0.05 (≈ 1/20) to give 2x headroom over the theoretical sharing limit.
131+
// This significantly reduces partition count and memory overhead when many MAL
132+
// metric types are registered (e.g., from otel-rules).
133+
final double weight = Meter.class.isAssignableFrom(metricsClass) ? 0.05 : 1.0;
134+
l1Queue.addHandler(metricsClass, new L1Handler(), weight);
125135
}
126136

127137
@Override

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentMinWorker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,12 @@ public class MetricsPersistentMinWorker extends MetricsPersistentWorker {
118118
QUEUE_USAGE_GAUGE = gauge;
119119
}
120120

121+
// No weight differentiation at L2. After L1 pre-aggregation, both OAL and MAL
122+
// produce one item per (metric_type × entity) per minute — similar burst patterns
123+
// and throughput. The OAL per-request amplification is absorbed by L1, so at L2
124+
// there is no meaningful throughput difference to justify partition weight tuning.
125+
// L2 buffer is also much smaller (2,000 vs 20,000), so the memory overhead of
126+
// extra partitions is modest (~16 MB total vs L1's ~167 MB).
121127
l2Queue.addHandler(metricsClass, new L2Handler());
122128
}
123129

oap-server/server-library/library-batch-queue/CLAUDE.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,24 @@ Adaptive growth (default multiplier 25, with 8 threads -> threshold 200):
8080
- 100 handlers -> 100 partitions (1:1)
8181
- 500 handlers -> 350 partitions (200 + 300/2)
8282

83+
### Weighted handlers
84+
85+
`addHandler(type, handler, weight)` allows different handler types to contribute different
86+
amounts to the partition count. The adaptive formula uses the weighted sum instead of raw
87+
handler count. Partition assignment remains hash-based (`typeHash()`) — weight only affects
88+
how many partitions exist, not which partition a type lands on.
89+
90+
L1 uses weight 0.05 for MAL metrics (vs 1.0 for OAL). Rationale: MAL emits ~500 items/type
91+
per scrape interval. With 20,000-slot buffers, ~40 MAL types can safely share one partition
92+
(20,000 / 500 = 40). Weight 0.05 ≈ 1/20 gives 2x headroom.
93+
94+
Example (8 threads, 642 OAL + 1,247 MAL):
95+
- Without weight: 1,889 handlers -> 1,045 partitions (167 MB array overhead at L1)
96+
- With weight: 642*1.0 + 1,247*0.05 = 705 effective -> 452 partitions (72 MB)
97+
98+
L2 uses default weight 1.0 for all types because after L1 pre-aggregation both OAL and MAL
99+
have similar per-minute burst patterns.
100+
83101
## Drain Rebalancing
84102

85103
Static round-robin partition assignment creates thread imbalance when metric types have varying

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/BatchQueue.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,13 @@ public class BatchQueue<T> {
128128
*/
129129
private final ConcurrentHashMap<Class<?>, HandlerConsumer<T>> handlerMap;
130130

131+
/**
132+
* Running weighted sum of registered handlers, used by adaptive partition policy.
133+
* Each handler contributes its weight (default 1.0) when registered via
134+
* {@link #addHandler(Class, HandlerConsumer, double)}.
135+
*/
136+
private double weightedHandlerCount;
137+
131138
/**
132139
* Tracks unregistered types that have already been warned about,
133140
* to avoid flooding the log with repeated errors.
@@ -382,23 +389,43 @@ private void scheduleDrain(final int taskIndex) {
382389
}
383390

384391
/**
385-
* Register a type-based handler. Items whose {@code getClass()} matches the given
386-
* type will be batched together and dispatched to this handler.
392+
* Register a type-based handler with default weight 1.0.
393+
*
394+
* @param type the class of items to route to this handler
395+
* @param handler the consumer that processes batches of the given type
396+
* @see #addHandler(Class, HandlerConsumer, double)
397+
*/
398+
public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) {
399+
addHandler(type, handler, 1.0);
400+
}
401+
402+
/**
403+
* Register a type-based handler with an explicit weight for adaptive partition sizing.
387404
*
388-
* <p>For adaptive partition policies, adding a handler recalculates the partition
389-
* count and grows the partition array if needed. For non-adaptive policies the
390-
* resolved count never changes, so this is a no-op beyond the registration.
391-
* Drain loop threads pick up new partitions on their next cycle via volatile reads.
405+
* <p>The weight controls how much this handler contributes to partition growth.
406+
* A weight of 1.0 means one handler ≈ one partition (below the adaptive threshold).
407+
* A lower weight (e.g., 0.05) means many handlers share a partition, suitable for
408+
* low-throughput types. The weighted sum of all handlers replaces the raw handler
409+
* count in the adaptive partition formula.
410+
*
411+
* <p>For non-adaptive partition policies the weight is ignored and this behaves
412+
* the same as {@link #addHandler(Class, HandlerConsumer)}.
392413
*
393414
* @param type the class of items to route to this handler
394415
* @param handler the consumer that processes batches of the given type
416+
* @param weight partition weight for this handler (default 1.0). Must be &gt; 0.
395417
*/
396418
@SuppressWarnings("unchecked")
397-
public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler) {
419+
public void addHandler(final Class<? extends T> type, final HandlerConsumer<T> handler,
420+
final double weight) {
421+
if (weight <= 0) {
422+
throw new IllegalArgumentException("Handler weight must be > 0, got: " + weight);
423+
}
398424
handlerMap.put(type, handler);
425+
weightedHandlerCount += weight;
399426

400427
final int newPartitionCount = config.getPartitions()
401-
.resolve(resolvedThreadCount, handlerMap.size());
428+
.resolve(resolvedThreadCount, weightedHandlerCount);
402429
final ArrayBlockingQueue<T>[] currentPartitions = this.partitions;
403430
if (newPartitionCount > currentPartitions.length) {
404431
final int oldCount = currentPartitions.length;

oap-server/server-library/library-batch-queue/src/main/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicy.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
* excess handlers share partitions at 1:2 ratio.</li>
3232
* </ul>
3333
*
34-
* <p>All policies are resolved via {@link #resolve(int, int)}. For non-adaptive
35-
* policies the handlerCount parameter is ignored. At queue creation time, if the
34+
* <p>All policies are resolved via {@link #resolve(int, double)}. For non-adaptive
35+
* policies the weightedHandlerCount parameter is ignored. At queue creation time, if the
3636
* resolved partition count is less than the thread count, the thread count is
3737
* reduced to match and a warning is logged.
3838
*/
@@ -130,29 +130,38 @@ public static PartitionPolicy adaptive(final int multiplier) {
130130
* <ul>
131131
* <li>fixed: returns the pre-set count (both parameters ignored).</li>
132132
* <li>threadMultiply: returns multiplier * resolvedThreadCount (handlerCount ignored).</li>
133-
* <li>adaptive: when handlerCount is 0, returns resolvedThreadCount as a sensible
134-
* initial count. Otherwise, threshold = threadCount * multiplier; if handlerCount
135-
* &lt;= threshold, returns handlerCount (1:1). If above, returns
136-
* threshold + (handlerCount - threshold) / 2.</li>
133+
* <li>adaptive: when weightedHandlerCount is 0, returns resolvedThreadCount as a sensible
134+
* initial count. Otherwise, threshold = threadCount * multiplier; if weightedHandlerCount
135+
* &lt;= threshold, returns weightedHandlerCount (1:1). If above, returns
136+
* threshold + (weightedHandlerCount - threshold) / 2.</li>
137137
* </ul>
138138
*
139139
* @param resolvedThreadCount the resolved number of drain threads
140-
* @param handlerCount the current number of registered type handlers
140+
* @param weightedHandlerCount the weighted sum of registered type handlers. Each handler
141+
* contributes its weight (default 1.0) to this sum.
142+
* High-weight handlers grow the partition count faster,
143+
* reducing the chance of hash collisions for those types.
144+
* Low-weight handlers grow the count slowly, so they are
145+
* more likely to share partitions with other types via
146+
* {@code typeHash()} routing. Note that partition assignment
147+
* is hash-based, not weight-based — there is no guarantee
148+
* that any type gets a dedicated partition.
141149
* @return the resolved partition count, always &gt;= 1
142150
*/
143-
public int resolve(final int resolvedThreadCount, final int handlerCount) {
151+
public int resolve(final int resolvedThreadCount, final double weightedHandlerCount) {
144152
if (fixedCount > 0) {
145153
return fixedCount;
146154
}
147155
if (adaptive) {
148-
if (handlerCount == 0) {
156+
final int effectiveCount = (int) Math.ceil(weightedHandlerCount);
157+
if (effectiveCount == 0) {
149158
return Math.max(1, resolvedThreadCount);
150159
}
151160
final int threshold = Math.max(1, multiplier * resolvedThreadCount);
152-
if (handlerCount <= threshold) {
153-
return handlerCount;
161+
if (effectiveCount <= threshold) {
162+
return effectiveCount;
154163
}
155-
return threshold + (handlerCount - threshold) / 2;
164+
return threshold + (effectiveCount - threshold) / 2;
156165
}
157166
return Math.max(1, multiplier * resolvedThreadCount);
158167
}

oap-server/server-library/library-batch-queue/src/test/java/org/apache/skywalking/oap/server/library/batchqueue/PartitionPolicyTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,29 @@ public void testAdaptiveRejectsInvalidMultiplier() {
109109
() -> PartitionPolicy.adaptive(0));
110110
}
111111

112+
@Test
113+
public void testAdaptiveWithWeightedHandlers() {
114+
// Simulate 642 OAL (weight 1.0) + 1247 MAL (weight 0.05)
115+
// Weighted count = 642 + 1247 * 0.05 = 642 + 62.35 = 704.35, ceil = 705
116+
// 8 threads * 25 = 200 threshold, 705 > 200
117+
// Result = 200 + (705 - 200) / 2 = 200 + 252 = 452
118+
final double weightedCount = 642 * 1.0 + 1247 * 0.05;
119+
assertEquals(452, PartitionPolicy.adaptive().resolve(8, weightedCount));
120+
}
121+
122+
@Test
123+
public void testAdaptiveWithLowWeightOnly() {
124+
// 100 MAL-only handlers at weight 0.05 = effective 5, ceil = 5
125+
// 8 threads, threshold = 200, 5 < 200 -> 1:1 -> 5 partitions
126+
assertEquals(5, PartitionPolicy.adaptive().resolve(8, 100 * 0.05));
127+
}
128+
129+
@Test
130+
public void testAdaptiveWithZeroWeightedCount() {
131+
// weightedCount = 0.0 should return threadCount
132+
assertEquals(8, PartitionPolicy.adaptive().resolve(8, 0.0));
133+
}
134+
112135
@Test
113136
public void testToString() {
114137
assertEquals("fixed(4)", PartitionPolicy.fixed(4).toString());

0 commit comments

Comments
 (0)