Add PERCENTILE aggregation function and related validation#16545
Add PERCENTILE aggregation function and related validation#16545FearfulTomcat27 wants to merge 4 commits into
Conversation
d18b6dd to
f63a796
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds an exact percentile aggregation to the IoTDB relational query engine and wires it through parsing/semantic validation, aggregation execution (grouped + non-grouped), RPC aggregation type enums, and integration tests.
Changes:
- Introduces a new exact percentile state container (
Percentile) plus grouped/non-grouped accumulators. - Registers
PERCENTILEacross SQL constants, builtin function enums, semantic validation, and accumulator factory creation. - Adds integration tests for correct percentile results and invalid-usage error handling.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
iotdb-protocol/thrift-commons/src/main/thrift/common.thrift |
Adds PERCENTILE to TAggregationType for RPC/planner/executor integration. |
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinAggregationFunction.java |
Registers percentile as a builtin aggregation and sets intermediate type handling. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java |
Adds the PERCENTILE SQL function name constant. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java |
Adds parser-time validation for percentile argument literal type. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java |
Adds semantic validation and return-type inference for percentile. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/array/PercentileBigArray.java |
Adds grouped state container for percentile states with retained-size tracking. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedPercentileAccumulator.java |
Implements grouped exact percentile aggregation accumulation/merge/finalization. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/PercentileAccumulator.java |
Implements non-grouped exact percentile aggregation accumulation/merge/finalization. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java |
Wires TAggregationType.PERCENTILE to the new accumulators. |
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/Percentile.java |
Adds the in-memory exact percentile computation + (de)serialization logic. |
integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java |
Adds integration tests for percentile and invalid-usage error cases. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (int i = 0; i < positionCount; i++) { | ||
| position = selectedPositions[i]; | ||
| groupId = groupIds[position]; |
There was a problem hiding this comment.
When a mask is applied, selectedPositions may contain fewer valid entries than mask.getPositionCount(). The loop over selectedPositions should be bounded by mask.getSelectedPositionCount() to avoid aggregating masked-out rows (or stale positions).
| for (int i = 0; i < positionCount; i++) { | ||
| position = selectedPositions[i]; | ||
| groupId = groupIds[position]; |
There was a problem hiding this comment.
In the masked (non-selectAll) branch, iterate selectedPositions using mask.getSelectedPositionCount() as the loop bound to avoid aggregating stale/unselected positions.
| Percentile percentile = new Percentile(); | ||
| if (size > percentile.capacity) { | ||
| percentile.capacity = size; | ||
| percentile.values = new double[size]; | ||
| } |
There was a problem hiding this comment.
Percentile.deserialize() does not restore the 'sorted' state. Since serialize() writes values in their current internal order (which may be unsorted), leaving the deserialized instance with sorted=true can lead to incorrect getPercentile() results if the deserialized object is queried directly. Set sorted=false on deserialization (or serialize in sorted order) to preserve correctness.
| public void updateRetainedSize(long index, Percentile value) { | ||
| Percentile percentile = array.get(index); | ||
| if (percentile != null) { | ||
| sizeOfPercentile -= percentile.getEstimatedSize(); | ||
| } |
There was a problem hiding this comment.
PercentileBigArray tracks retained size only when an element is set, but Percentile instances can grow their internal arrays as values are added. Since addValue()/merge() mutate the object after retrieval without calling updateRetainedSize, sizeOfPercentile can become arbitrarily stale and under-report memory. Consider updating the retained-size accounting when a Percentile grows (or redesign size tracking to avoid cached sizes for growable objects).
| array.forEach( | ||
| item -> { | ||
| if (item != null) { | ||
| item.clear(); | ||
| } | ||
| }); | ||
| sizeOfPercentile = 0; |
There was a problem hiding this comment.
reset() sets sizeOfPercentile = 0 even though the Percentile objects (and their backing arrays) remain allocated. This makes sizeOf()/isEmpty() inaccurate and can cause severe underestimation of operator memory usage after reset. Keep sizeOfPercentile consistent with retained objects (e.g., don't zero it, or recompute it based on current capacities).
| array.forEach( | |
| item -> { | |
| if (item != null) { | |
| item.clear(); | |
| } | |
| }); | |
| sizeOfPercentile = 0; | |
| sizeOfPercentile = 0; | |
| array.forEach( | |
| item -> { | |
| if (item != null) { | |
| item.clear(); | |
| sizeOfPercentile += item.getEstimatedSize(); | |
| } | |
| }); |
| if (mask.isSelectAll()) { | ||
| for (int i = 0; i < positionCount; i++) { | ||
| if (!column.isNull(i)) { | ||
| percentile.addValue(column.getLong(i)); | ||
| } |
There was a problem hiding this comment.
INT64/TIMESTAMP inputs are being implicitly converted to double when stored in Percentile, which silently loses precision for large INT64 values (>|2^53|) and can return incorrect results. Consider implementing a long-backed path for INT64/TIMESTAMP or using an exact conversion check (and throwing) when the long value cannot be represented exactly as a double.
| for (int i = 0; i < positionCount; i++) { | ||
| position = selectedPositions[i]; | ||
| groupId = groupIds[position]; |
There was a problem hiding this comment.
The non-selectAll path should iterate up to mask.getSelectedPositionCount() (not mask.getPositionCount()) when traversing selectedPositions; otherwise masked-out rows can be included.
| for (int i = 0; i < positionCount; i++) { | ||
| position = selectedPositions[i]; | ||
| groupId = groupIds[position]; |
There was a problem hiding this comment.
Looping over selectedPositions should use mask.getSelectedPositionCount() as the bound; using mask.getPositionCount() can include masked-out positions and corrupt results.
| int groupId = groupIds[i]; | ||
| Percentile percentile = array.get(groupId); | ||
| if (!valueColumn.isNull(i)) { | ||
| percentile.addValue(valueColumn.getLong(i)); | ||
| } |
There was a problem hiding this comment.
INT64/TIMESTAMP values are implicitly converted to double when stored (via Percentile.addValue(double)). This silently loses precision for values outside the exact double integer range (>|2^53|), producing incorrect percentiles. Consider storing longs separately for INT64/TIMESTAMP or, at minimum, enforce exact conversion (similar to AbstractApproxPercentileAccumulator.toDoubleExact) and fail fast when precision would be lost.
f63a796 to
ce8a0ef
Compare
ce8a0ef to
87cc668
Compare
JackieTien97
left a comment
There was a problem hiding this comment.
感谢贡献!这个 PR 为 table model 增加了精确的 percentile 聚合函数,整体结构清晰、与现有 approx_percentile 的代码风格对齐,校验逻辑(参数个数 / 类型 / double literal)和正反向测试都比较完整。把 MemoryReservationManager 串进 AccumulatorFactory 各调用点的改动也是对的方向。
下面按严重程度给出几处需要处理的问题(详见行内评论):
🔴 必须修复(正确性)
GroupedPercentileAccumulator的 4 个add*Input在非 selectAll 分支用mask.getPositionCount()作循环边界,应为getSelectedPositionCount()。带 FILTER / mask 的分组 percentile 会读到无效 position,导致结果错误甚至 AIOOBE。非分组版与现有 Mode/Avg accumulator 都用的是getSelectedPositionCount()。
🟠 建议修复
PercentileBigArray.sizeOf()内存核算被严重低估:sizeOfPercentile只在分组首次创建时记一次初始容量,扩容后从不更新,使本 PR 的内存预留逻辑基本失效(大分组无法触发反压 / 防 OOM)。- 新增 4 个文件用的是简化版 License 头,与全仓库统一的 ASF 长头不一致,
license-maven-plugin校验很可能让 CI 失败。
🟡 / 🔵 可一并处理
Percentile.deserialize未重置sorted标志(当前靠 merge 兜底,属隐患)。- 精确实现保留并传输全部原始值,
getSerializedSize()等存在整型溢出与大基数 OOM 风险,建议至少用(long)计算并在文档/实现上明确与approx_percentile的取舍。
测试建议
- 补
percentile(...) FILTER (WHERE ...)的分组用例(可暴露上面 🔴 的问题); - 补一个会触发两阶段 partial→final 合并的分布式/多 region 用例(覆盖 serialize/deserialize/merge 路径);
- 可选:大分组内存用例,验证内存预留生效。
其中 🔴 与 License 头两项建议合入前先解决。其余可按优先级跟进。
| public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask mask) { | ||
| Column valueColumn = arguments[0]; | ||
|
|
||
| int positionCount = mask.getPositionCount(); |
There was a problem hiding this comment.
🔴 [正确性 / 阻塞] 这里用了 mask.getPositionCount(),但 else(非 selectAll)分支的循环边界应当是 mask.getSelectedPositionCount()。
当聚合带过滤掩码时(如 percentile(x, 0.5) FILTER (WHERE ...),或优化器引入的 mask channel——见 GroupedAggregator 里 applyMaskBlock 的调用),mask.getSelectedPositions() 数组只有前 selectedPositionCount 个元素有效,其余是上一批的残留或 0。按 positionCount(> selectedPositionCount)遍历会读到这些无效下标:轻则把 position 0 等错误位置重复计入,导致分组结果静默错误;若底层数组长度不足则抛 ArrayIndexOutOfBoundsException。
对照本 PR 的非分组版 PercentileAccumulator.addIntInput 以及现有的 GroupedModeAccumulator/GroupedAvgAccumulator,全部用的是 getSelectedPositionCount()。selectAll 分支因 selectedPositionCount == positionCount 不受影响,所以直接改成 getSelectedPositionCount() 即可让两个分支都正确。
相同问题存在于 addLongInput(L192)、addFloatInput(L220)、addDoubleInput(L248)。 现有 IT 未覆盖带 FILTER 的 percentile,因此没被测出来,建议补一条用例。
另:这四个 add*Input 方法建议改为 private(同类 accumulator 均为 private)。
| array = new ObjectBigArray<>(); | ||
| } | ||
|
|
||
| public long sizeOf() { |
There was a problem hiding this comment.
🟠 [内存核算] sizeOf() 返回值会被严重低估,使本 PR 引入 MemoryReservationManager 的初衷基本失效。
sizeOfPercentile 只在 updateRetainedSize() 中更新,而它仅在 get() 首次为某分组创建 Percentile 时(经 set())被调用一次,统计的是初始容量(32 个 double)。之后 accumulator 通过 get(groupId).addValue(...) / merge(...) 不断向 Percentile 追加并扩容,但 set()/updateRetainedSize() 不会再被调用,扩容后的真实占用从未被计入。
于是 GroupedPercentileAccumulator.getEstimatedSize() → updateMemoryReservation() 预留的内存远小于实际,大分组下既不会触发反压也无法防止 OOM。对照非分组版 PercentileAccumulator 是用 percentile.getEstimatedSize() 实时计算的,无此问题。
建议:在 sizeOf() 时实时累加各分组 Percentile.getEstimatedSize(),或在每次 addValue/merge 后同步 sizeOfPercentile。
| } | ||
| } | ||
|
|
||
| public static Percentile deserialize(ByteBuffer buffer) { |
There was a problem hiding this comment.
🟡 [健壮性 / 隐患] deserialize 出来的对象 sorted 仍是构造函数里的 true,但 serialize() 写出的数据并不保证有序(序列化时通常 sorted == false)。也就是说反序列化得到的 Percentile 自称“已排序”,实则可能无序。
当前尚未真正触发 bug,因为反序列化结果只通过 merge() 被消费(merge 直接读 values[] 并把目标置为 sorted = false)。但这是个定时炸弹:一旦将来出现直接对反序列化对象调用 getPercentile() 的路径(例如单 partial 跳过 merge 的优化),ensureSorted() 会因 sorted == true 跳过排序而静默返回错误结果。
建议在此显式设置 percentile.sorted = false;(或让 serialize() 前先 ensureSorted(),使序列化数据始终有序)。
| @@ -0,0 +1,161 @@ | |||
| /* | |||
There was a problem hiding this comment.
🟡 [License 头 / CI 风险] 本仓库 Java 文件统一使用 ASF 长版权头(Licensed to the Apache Software Foundation (ASF) under one ...;本包下现有 112/112 个文件均如此),而本 PR 新增文件用的是简化版 Apache 头。仓库通过 license-maven-plugin 校验头部(见根 pom.xml),这很可能导致 License check CI 失败。
请将本 PR 所有新增文件的头替换为长版 ASF 头:Percentile.java、PercentileAccumulator.java、GroupedPercentileAccumulator.java、PercentileBigArray.java。直接复制同目录现有文件的头即可。
| return percentile; | ||
| } | ||
|
|
||
| public int getSerializedSize() { |
There was a problem hiding this comment.
🔵 [可扩展性 / 溢出] 需要意识到 percentile 是精确实现,会在内存中保留分组内全部原始值,并在两阶段聚合时把所有值序列化在节点间传输——这与 approx_percentile(TDigest, 常数空间) 是根本不同的取舍。
getSerializedSize() 用 size * Double.BYTES(int 运算),当单组 size 超过约 2.68 亿时整型溢出得到负值;ByteBuffer.allocate() 也有 2GB 上限;grow() 里 (int)(capacity * 1.5) 同样可能溢出。大基数场景既是 OOM 风险也是溢出风险。
建议:至少把 size * Double.BYTES 改为 (long) size * Double.BYTES 避免静默溢出;并考虑对单组规模设上限并给出明确报错,文档中说明与 approx_percentile 的取舍。
…r for percentile aggregation
… for improved memory management
…, memory accounting, license headers, and integer overflow 1. Percentile.deserialize now resets sorted=false to guarantee re-sort 2. PercentileBigArray.reset now recomputes sizeOfPercentile instead of zeroing 3. Fix Apache License header format in 4 new files (ASF full header) 4. Use long arithmetic in getSerializedSize and evaluateIntermediate to avoid overflow
87cc668 to
5b1c70c
Compare
This pull request introduces the new
percentileaggregation function to the IoTDB relational query engine, providing support for calculating percentiles over numeric columns. The changes include the implementation of the core percentile calculation logic, integration into the accumulator factory, and comprehensive tests for both correct behavior and error handling.Percentile Aggregation Support
PercentileAccumulatorand supportingPercentileclass to implement percentile calculations for numeric types (INT32,INT64,FLOAT,DOUBLE,TIMESTAMP). [1] [2]AccumulatorFactory, enabling both grouped and non-grouped percentile aggregations. [1] [2] [3]Testing and Validation
IoTDBTableAggregationIT.javafor verifying percentile queries and their results, including grouped queries.percentilefunction, such as wrong argument count, invalid percentage values, and unsupported data types.