Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4388,6 +4388,42 @@ public void approxPercentileTest() {
DATABASE_NAME);
}

@Test
public void percentileTest() {
tableResultSetEqualTest(
"select percentile(time, 0.5),percentile(s1,0.5),percentile(s2,0.5),percentile(s3,0.5),percentile(s4,0.5),percentile(s9,0.5) from table1",
buildHeaders(6),
new String[] {"2024-09-24T06:15:40.000Z,40,43000,37.5,43.0,2024-09-24T06:15:40.000Z,"},
DATABASE_NAME);

tableResultSetEqualTest(
"select time,province,percentile(time, 0.5),percentile(s1,0.5),percentile(s2,0.5) from table1 group by 1,2 order by 2,1",
new String[] {"time", "province", "_col2", "_col3", "_col4"},
new String[] {
"2024-09-24T06:15:30.000Z,beijing,2024-09-24T06:15:30.000Z,30,null,",
"2024-09-24T06:15:31.000Z,beijing,2024-09-24T06:15:31.000Z,null,31000,",
"2024-09-24T06:15:35.000Z,beijing,2024-09-24T06:15:35.000Z,null,35000,",
"2024-09-24T06:15:36.000Z,beijing,2024-09-24T06:15:36.000Z,36,null,",
"2024-09-24T06:15:40.000Z,beijing,2024-09-24T06:15:40.000Z,40,40000,",
"2024-09-24T06:15:41.000Z,beijing,2024-09-24T06:15:41.000Z,41,null,",
"2024-09-24T06:15:46.000Z,beijing,2024-09-24T06:15:46.000Z,null,46000,",
"2024-09-24T06:15:50.000Z,beijing,2024-09-24T06:15:50.000Z,null,50000,",
"2024-09-24T06:15:51.000Z,beijing,2024-09-24T06:15:51.000Z,null,null,",
"2024-09-24T06:15:55.000Z,beijing,2024-09-24T06:15:55.000Z,55,null,",
"2024-09-24T06:15:30.000Z,shanghai,2024-09-24T06:15:30.000Z,30,null,",
"2024-09-24T06:15:31.000Z,shanghai,2024-09-24T06:15:31.000Z,null,31000,",
"2024-09-24T06:15:35.000Z,shanghai,2024-09-24T06:15:35.000Z,null,35000,",
"2024-09-24T06:15:36.000Z,shanghai,2024-09-24T06:15:36.000Z,36,null,",
"2024-09-24T06:15:40.000Z,shanghai,2024-09-24T06:15:40.000Z,40,40000,",
"2024-09-24T06:15:41.000Z,shanghai,2024-09-24T06:15:41.000Z,41,null,",
"2024-09-24T06:15:46.000Z,shanghai,2024-09-24T06:15:46.000Z,null,46000,",
"2024-09-24T06:15:50.000Z,shanghai,2024-09-24T06:15:50.000Z,null,50000,",
"2024-09-24T06:15:51.000Z,shanghai,2024-09-24T06:15:51.000Z,null,null,",
"2024-09-24T06:15:55.000Z,shanghai,2024-09-24T06:15:55.000Z,55,null,",
},
DATABASE_NAME);
}

@Test
public void exceptionTest() {
tableAssertTestFail(
Expand Down Expand Up @@ -4478,6 +4514,22 @@ public void exceptionTest() {
"select 1 as g, approx_percentile(s1,s2,0.5) from table1 group by 1",
"701: Aggregation functions [approx_percentile] do not support weight as INT64 type",
DATABASE_NAME);
tableAssertTestFail(
"select percentile() from table1",
"701: Aggregation functions [percentile] should only have two arguments",
DATABASE_NAME);
tableAssertTestFail(
"select percentile(s1,1.1) from table1",
"701: percentage should be in [0,1], got 1.1",
DATABASE_NAME);
tableAssertTestFail(
"select percentile(s1,'test') from table1",
"701: The second argument of 'percentile' function percentage must be a double literal",
DATABASE_NAME);
tableAssertTestFail(
"select percentile(s5,0.5) from table1",
"701: Aggregation functions [percentile] should have value column as numeric type [INT32, INT64, FLOAT, DOUBLE, TIMESTAMP]",
DATABASE_NAME);
}

// ==================================================================
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [License 头 / CI 风险] 本仓库 Java 文件统一使用 ASF 长版权头(Licensed to the Apache Software Foundation (ASF) under one ...;本包下现有 112/112 个文件均如此),而本 PR 新增文件用的是简化版 Apache 头。仓库通过 license-maven-plugin 校验头部(见根 pom.xml),这很可能导致 License check CI 失败。

请将本 PR 所有新增文件的头替换为长版 ASF 头:Percentile.javaPercentileAccumulator.javaGroupedPercentileAccumulator.javaPercentileBigArray.java。直接复制同目录现有文件的头即可。

* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.calc.execution.operator.source.relational;

import org.apache.iotdb.commons.exception.SemanticException;

import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.nio.ByteBuffer;
import java.util.Arrays;

public class Percentile {
private double[] values;
private int size;
private int capacity;
private boolean sorted;

private static final int INITIAL_CAPACITY = 32;
private static final double GROWTH_FACTOR = 1.5;

public Percentile() {
this.capacity = INITIAL_CAPACITY;
this.values = new double[capacity];
this.size = 0;
this.sorted = true;
}

public void addValue(double value) {
ensureCapacity();
values[size++] = value;
sorted = false;
}

public void addValues(double... vals) {
if (vals == null || vals.length == 0) {
return;
}

int newSize = size + vals.length;
if (newSize > capacity) {
grow(newSize);
}

System.arraycopy(vals, 0, values, size, vals.length);
size = newSize;
sorted = false;
}

public void merge(Percentile other) {
if (other == null || other.size == 0) {
return;
}

int newSize = size + other.size;
if (newSize > capacity) {
grow(newSize);
}

System.arraycopy(other.values, 0, values, size, other.size);
size = newSize;
sorted = false;
}

public double getPercentile(double percentile) {
if (size == 0) {
return Double.NaN;
}
if (percentile < 0.0 || percentile > 1.0) {
throw new SemanticException("percentage should be in [0,1], got " + percentile);
}

ensureSorted();

if (size == 1) {
return values[0];
}

double realIndex = percentile * (size - 1);
int index = (int) realIndex;
double fraction = realIndex - index;

if (index >= size - 1) {
return values[size - 1];
}

return values[index] + fraction * (values[index + 1] - values[index]);
}

public int getSize() {
return size;
}

public void clear() {
// Shrink the backing array back to the initial capacity so the memory held by a large group is
// actually released on reset, instead of staying reserved at the historical peak capacity.
if (capacity > INITIAL_CAPACITY) {
capacity = INITIAL_CAPACITY;
values = new double[capacity];
}
size = 0;
sorted = true;
}

private void ensureCapacity() {
if (size >= capacity) {
grow(size + 1);
}
}

private void grow(int minCapacity) {
int newCapacity = Math.max((int) (capacity * GROWTH_FACTOR), minCapacity);
double[] newValues = new double[newCapacity];
System.arraycopy(values, 0, newValues, 0, size);
values = newValues;
capacity = newCapacity;
}

private void ensureSorted() {
if (!sorted && size > 1) {
Arrays.sort(values, 0, size);
sorted = true;
}
}

public void serialize(ByteBuffer buffer) {
ReadWriteIOUtils.write(size, buffer);
for (int i = 0; i < size; i++) {
ReadWriteIOUtils.write(values[i], buffer);
}
}

public static Percentile deserialize(ByteBuffer buffer) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 [健壮性 / 隐患] deserialize 出来的对象 sorted 仍是构造函数里的 true,但 serialize() 写出的数据并不保证有序(序列化时通常 sorted == false)。也就是说反序列化得到的 Percentile 自称“已排序”,实则可能无序。

当前尚未真正触发 bug,因为反序列化结果只通过 merge() 被消费(merge 直接读 values[] 并把目标置为 sorted = false)。但这是个定时炸弹:一旦将来出现直接对反序列化对象调用 getPercentile() 的路径(例如单 partial 跳过 merge 的优化),ensureSorted() 会因 sorted == true 跳过排序而静默返回错误结果

建议在此显式设置 percentile.sorted = false;(或让 serialize() 前先 ensureSorted(),使序列化数据始终有序)。

int size = ReadWriteIOUtils.readInt(buffer);
Percentile percentile = new Percentile();
if (size > percentile.capacity) {
percentile.capacity = size;
percentile.values = new double[size];
}
Comment on lines +152 to +156

Copilot AI Apr 23, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Percentile.deserialize() does not restore the 'sorted' state. Since serialize() writes values in their current internal order (which may be unsorted), leaving the deserialized instance with sorted=true can lead to incorrect getPercentile() results if the deserialized object is queried directly. Set sorted=false on deserialization (or serialize in sorted order) to preserve correctness.

Copilot uses AI. Check for mistakes.
percentile.size = size;
for (int i = 0; i < size; i++) {
percentile.values[i] = ReadWriteIOUtils.readDouble(buffer);
}
percentile.sorted = false;
return percentile;
}

public int getSerializedSize() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 [可扩展性 / 溢出] 需要意识到 percentile精确实现,会在内存中保留分组内全部原始值,并在两阶段聚合时把所有值序列化在节点间传输——这与 approx_percentile(TDigest, 常数空间) 是根本不同的取舍。

getSerializedSize()size * Double.BYTES(int 运算),当单组 size 超过约 2.68 亿时整型溢出得到负值;ByteBuffer.allocate() 也有 2GB 上限;grow()(int)(capacity * 1.5) 同样可能溢出。大基数场景既是 OOM 风险也是溢出风险。

建议:至少把 size * Double.BYTES 改为 (long) size * Double.BYTES 避免静默溢出;并考虑对单组规模设上限并给出明确报错,文档中说明与 approx_percentile 的取舍。

return Integer.BYTES + (int) ((long) size * Double.BYTES);
}

public long getEstimatedSize() {
return RamUsageEstimator.shallowSizeOfInstance(Percentile.class)
+ (long) capacity * Double.BYTES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedMinByAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedModeAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedRegressionAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedPercentileAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedSumAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedUserDefinedAggregateAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.GroupedVarianceAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.IntGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.LongGroupedApproxMostFrequentAccumulator;
import org.apache.iotdb.calc.i18n.CalcMessages;
import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.commons.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
import org.apache.iotdb.commons.queryengine.execution.operator.source.relational.aggregation.grouped.hash.MarkDistinctHash;
Expand Down Expand Up @@ -106,7 +108,8 @@ public static TableAccumulator createAccumulator(
boolean isAggTableScan,
String timeColumnName,
Set<String> measurementColumnNames,
boolean distinct) {
boolean distinct,
MemoryReservationManager memoryReservationManager) {
TableAccumulator result;

// Input expression size of 1 indicates aggregation split has occurred and this is a final
Expand Down Expand Up @@ -166,7 +169,7 @@ public static TableAccumulator createAccumulator(
? new FirstAccumulator(inputDataTypes.get(0), isAggTableScan)
: new FirstDescAccumulator(inputDataTypes.get(0));
} else {
result = createBuiltinAccumulator(aggregationType, inputDataTypes);
result = createBuiltinAccumulator(aggregationType, inputDataTypes, memoryReservationManager);
}

if (distinct) {
Expand All @@ -188,7 +191,8 @@ public static GroupedAccumulator createGroupedAccumulator(
List<Expression> inputExpressions,
Map<String, String> inputAttributes,
boolean ascending,
boolean distinct) {
boolean distinct,
MemoryReservationManager memoryReservationManager) {
GroupedAccumulator result;

if (aggregationType == TAggregationType.UDAF) {
Expand All @@ -197,7 +201,12 @@ public static GroupedAccumulator createGroupedAccumulator(
} else {
result =
createBuiltinGroupedAccumulator(
aggregationType, inputDataTypes, inputExpressions, inputAttributes, ascending);
aggregationType,
inputDataTypes,
inputExpressions,
inputAttributes,
ascending,
memoryReservationManager);
}

if (distinct) {
Expand Down Expand Up @@ -242,7 +251,8 @@ private static GroupedAccumulator createBuiltinGroupedAccumulator(
List<TSDataType> inputDataTypes,
List<Expression> inputExpressions,
Map<String, String> inputAttributes,
boolean ascending) {
boolean ascending,
MemoryReservationManager memoryReservationManager) {
switch (aggregationType) {
case COUNT:
return new GroupedCountAccumulator();
Expand Down Expand Up @@ -326,14 +336,18 @@ private static GroupedAccumulator createBuiltinGroupedAccumulator(
case KURTOSIS:
return new GroupedCentralMomentAccumulator(
inputDataTypes.get(0), CentralMomentAccumulator.MomentType.KURTOSIS);
case PERCENTILE:
return new GroupedPercentileAccumulator(inputDataTypes.get(0), memoryReservationManager);
default:
throw new IllegalArgumentException(
CalcMessages.INVALID_AGGREGATION_FUNCTION + aggregationType);
}
}

public static TableAccumulator createBuiltinAccumulator(
TAggregationType aggregationType, List<TSDataType> inputDataTypes) {
TAggregationType aggregationType,
List<TSDataType> inputDataTypes,
MemoryReservationManager memoryReservationManager) {
switch (aggregationType) {
case COUNT:
return new CountAccumulator();
Expand Down Expand Up @@ -418,6 +432,8 @@ public static TableAccumulator createBuiltinAccumulator(
case KURTOSIS:
return new TableCentralMomentAccumulator(
inputDataTypes.get(0), CentralMomentAccumulator.MomentType.KURTOSIS);
case PERCENTILE:
return new PercentileAccumulator(inputDataTypes.get(0), memoryReservationManager);
default:
throw new IllegalArgumentException(
CalcMessages.INVALID_AGGREGATION_FUNCTION + aggregationType);
Expand Down
Loading