Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ public FixedIntervalFillFilter(long timeInterval) {
public boolean needFill(long time, long previousTime) {
// the reason that we use Math.abs is that we may use order by time desc which will cause
// previousTime is larger than time
return Math.abs(time - previousTime) <= timeInterval;
return !isTimeDistanceGreaterThan(time, previousTime, timeInterval);
}

private static boolean isTimeDistanceGreaterThan(long left, long right, long distance) {
if (distance < 0) {
return true;
}
long difference = left >= right ? left - right : right - left;
return difference < 0 || difference > distance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ private boolean fill(
}

private double getFactor(long currentTime) {
return nextTimeInCurrentColumn - previousTime == 0
? 0.0
: ((double) (currentTime - previousTime)) / (nextTimeInCurrentColumn - previousTime);
double timeRange = (double) nextTimeInCurrentColumn - (double) previousTime;
return timeRange == 0 ? 0.0 : ((double) currentTime - (double) previousTime) / timeRange;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.calc.transformation.dag.column.unary.scalar.DateBinFunctionColumnTransformer.saturatingAdd;

abstract class AbstractGapFillOperator implements ProcessOperator {

Expand Down Expand Up @@ -125,7 +126,9 @@ public TsBlock next() throws Exception {
// -1 because we should not include current row, current row will be appended in
// writeCurrentRow
long currentEndTime =
timeColumn.isNull(i) ? endTime : block.getColumn(timeColumnIndex).getLong(i) - 1;
timeColumn.isNull(i)
? endTime
: saturatingAdd(block.getColumn(timeColumnIndex).getLong(i), -1);
fillGaps(block, i, currentEndTime);
writeCurrentRow(block, i);
}
Expand Down Expand Up @@ -155,8 +158,12 @@ private void writeCurrentRow(TsBlock block, int rowIndex) {

private void fillGaps(TsBlock block, int rowIndex, long currentEndTime) {
while (currentTime <= currentEndTime) {
long previousTime = currentTime;
gapFillRow(currentTime, block, rowIndex);
nextTime();
if (currentTime <= previousTime) {
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.read.common.type.Type;

import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -37,6 +38,8 @@

private static final long NANOSECONDS_IN_MILLISECOND = 1_000_000;
private static final long NANOSECONDS_IN_MICROSECOND = 1_000;
private static final BigInteger BIG_LONG_MIN = BigInteger.valueOf(Long.MIN_VALUE);
private static final BigInteger BIG_LONG_MAX = BigInteger.valueOf(Long.MAX_VALUE);

private final int monthDuration;
private final long nonMonthDuration;
Expand Down Expand Up @@ -149,12 +152,18 @@
return convertToTimestamp(binStart, zoneId);
}

long diff = source - origin;
long n = diff >= 0 ? diff / nonMonthDuration : (diff - nonMonthDuration + 1) / nonMonthDuration;
return origin + (n * nonMonthDuration);
return saturateToLong(getNonMonthDateBinStart(source, origin, nonMonthDuration));
}

public long[] dateBinStartEnd(long source) {
return dateBinStartEnd(source, false);
}

public long[] dateBinStartEndClosed(long source) {
return dateBinStartEnd(source, true);
}

private long[] dateBinStartEnd(long source, boolean closedEnd) {

Check warning on line 166 in iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/transformation/dag/column/unary/scalar/DateBinFunctionColumnTransformer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '158'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6xSeLa1wBMKle0X2fL&open=AZ6xSeLa1wBMKle0X2fL&pullRequest=17893
// return source if interval is 0
if (monthDuration == 0 && nonMonthDuration == 0) {
return new long[] {source, source};
Expand All @@ -180,17 +189,17 @@
binStart.minusMonths(monthDuration).minusNanos(getNanoTimeStamp(nonMonthDuration));
}

return new long[] {
convertToTimestamp(binStart, zoneId),
convertToTimestamp(binStart.plusMonths(monthDuration), zoneId)
};
long startTime = convertToTimestamp(binStart, zoneId);
long endTime = convertToTimestamp(binStart.plusMonths(monthDuration), zoneId);
return new long[] {startTime, closedEnd ? saturatingAdd(endTime, -1) : endTime};
}

long diff = source - origin;
long n = diff >= 0 ? diff / nonMonthDuration : (diff - nonMonthDuration + 1) / nonMonthDuration;
return new long[] {
origin + (n * nonMonthDuration), origin + (n * nonMonthDuration) + nonMonthDuration
};
BigInteger startTime = getNonMonthDateBinStart(source, origin, nonMonthDuration);
BigInteger endTime = startTime.add(BigInteger.valueOf(nonMonthDuration));
if (closedEnd) {
endTime = endTime.subtract(BigInteger.ONE);
}
return new long[] {saturateToLong(startTime), saturateToLong(endTime)};
}

public static long nextDateBin(int monthDuration, ZoneId zoneId, long currentTime) {
Expand All @@ -200,7 +209,33 @@
}

public static long nextDateBin(long nonMonthDuration, long currentTime) {
return currentTime + nonMonthDuration;
return saturatingAdd(currentTime, nonMonthDuration);
}

public static long saturatingAdd(long left, long right) {
return saturateToLong(BigInteger.valueOf(left).add(BigInteger.valueOf(right)));
}

private static BigInteger getNonMonthDateBinStart(
long source, long origin, long nonMonthDuration) {
BigInteger diff = BigInteger.valueOf(source).subtract(BigInteger.valueOf(origin));
BigInteger duration = BigInteger.valueOf(nonMonthDuration);
BigInteger[] quotientAndRemainder = diff.divideAndRemainder(duration);
BigInteger n = quotientAndRemainder[0];
if (diff.signum() < 0 && quotientAndRemainder[1].signum() != 0) {
n = n.subtract(BigInteger.ONE);
}
return BigInteger.valueOf(origin).add(n.multiply(duration));
}

private static long saturateToLong(BigInteger value) {
if (value.compareTo(BIG_LONG_MAX) > 0) {
return Long.MAX_VALUE;
}
if (value.compareTo(BIG_LONG_MIN) < 0) {
return Long.MIN_VALUE;
}
return value.longValue();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,23 @@ public class QueryTimeoutRuntimeException extends IoTDBRuntimeException {

public QueryTimeoutRuntimeException(long startTime, long currentTime, long timeout) {
super(
String.format(QUERY_TIMEOUT_EXCEPTION_MESSAGE, startTime, startTime + timeout, currentTime),
String.format(
QUERY_TIMEOUT_EXCEPTION_MESSAGE,
startTime,
saturatingAdd(startTime, timeout),
currentTime),
QUERY_TIMEOUT.getStatusCode(),
true);
}

private static long saturatingAdd(long left, long right) {
long result = left + right;
if (right > 0 && result < left) {
return Long.MAX_VALUE;
}
if (right < 0 && result > left) {
return Long.MIN_VALUE;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.tsfile.utils.Pair;

import java.math.BigInteger;
import java.util.Collections;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -80,10 +81,12 @@ public Set<TimeSeriesWindow> mayAddWindow(
? slidingBoundaryTime
: windowList.get(windowList.size() - 1).getTimestamp();

if (timeStamp >= (windowList.isEmpty() ? lastTime : lastTime + slidingInterval)) {
if (windowList.isEmpty()
? timeStamp >= lastTime
: isTimestampAtOrAfterWindowEnd(timeStamp, lastTime, slidingInterval)) {
final TimeSeriesWindow window = new TimeSeriesWindow(this, null);
// Align to the last time + k * slidingInterval, k is a natural number
window.setTimestamp(((timeStamp - lastTime) / slidingInterval) * slidingInterval + lastTime);
window.setTimestamp(alignWindowStart(timeStamp, lastTime, slidingInterval));
windowList.add(window);
return Collections.singleton(window);
}
Expand All @@ -96,12 +99,12 @@ public Pair<WindowState, WindowOutput> updateAndMaySetWindowState(
if (timeStamp < window.getTimestamp()) {
return new Pair<>(WindowState.IGNORE_VALUE, null);
}
if (timeStamp >= window.getTimestamp() + slidingInterval) {
if (isTimestampAtOrAfterWindowEnd(timeStamp, window.getTimestamp(), slidingInterval)) {
return new Pair<>(
WindowState.EMIT_AND_PURGE_WITHOUT_COMPUTE,
new WindowOutput()
.setTimestamp(window.getTimestamp())
.setProgressTime(window.getTimestamp() + slidingInterval));
.setProgressTime(saturatingAdd(window.getTimestamp(), slidingInterval)));
}
return new Pair<>(WindowState.COMPUTE, null);
}
Expand All @@ -110,6 +113,33 @@ public Pair<WindowState, WindowOutput> updateAndMaySetWindowState(
public WindowOutput forceOutput(final TimeSeriesWindow window) {
return new WindowOutput()
.setTimestamp(window.getTimestamp())
.setProgressTime(window.getTimestamp() + slidingInterval);
.setProgressTime(saturatingAdd(window.getTimestamp(), slidingInterval));
}

private static boolean isTimestampAtOrAfterWindowEnd(
final long timestamp, final long windowStart, final long interval) {
return windowStart <= Long.MAX_VALUE - interval && timestamp >= windowStart + interval;
}

private static long alignWindowStart(
final long timestamp, final long baseTime, final long interval) {
final BigInteger base = BigInteger.valueOf(baseTime);
final BigInteger intervalValue = BigInteger.valueOf(interval);
return base.add(
BigInteger.valueOf(timestamp)
.subtract(base)
.divide(intervalValue)
.multiply(intervalValue))
.longValueExact();
}

private static long saturatingAdd(final long left, final long right) {
if (right > 0 && left > Long.MAX_VALUE - right) {
return Long.MAX_VALUE;
}
if (right < 0 && left < Long.MIN_VALUE - right) {
return Long.MIN_VALUE;
}
return left + right;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.processor.downsampling;

public class DownSamplingTimeUtils {

private DownSamplingTimeUtils() {
// Utility class.
}

public static boolean isTimeDistanceLessThanOrEqualTo(long left, long right, long distance) {
if (distance < 0) {
return false;
}
long difference = left >= right ? left - right : right - left;
return difference >= 0 && difference <= distance;
}

public static boolean isTimeDistanceGreaterThanOrEqualTo(long left, long right, long distance) {
if (distance < 0) {
return true;
}
long difference = left >= right ? left - right : right - left;
return difference < 0 || difference >= distance;
}

public static double timeDifferenceAsDouble(long left, long right) {
return (double) left - (double) right;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.time.LocalDate;
import java.util.Objects;

import static org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingTimeUtils.isTimeDistanceGreaterThanOrEqualTo;
import static org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingTimeUtils.isTimeDistanceLessThanOrEqualTo;

public class ChangingValueFilter<T> {

private final ChangingValueSamplingProcessor processor;
Expand Down Expand Up @@ -59,13 +62,13 @@ public boolean filter(final long timestamp, final T value) {
}

private boolean tryFilter(final long timestamp, final T value) {
final long timeDiff = Math.abs(timestamp - lastStoredTimestamp);

if (timeDiff <= processor.getCompressionMinTimeInterval()) {
if (isTimeDistanceLessThanOrEqualTo(
timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) {
return false;
}

if (timeDiff >= processor.getCompressionMaxTimeInterval()) {
if (isTimeDistanceGreaterThanOrEqualTo(
timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) {
reset(timestamp, value);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.time.LocalDate;
import java.util.Objects;

import static org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingTimeUtils.isTimeDistanceGreaterThanOrEqualTo;
import static org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingTimeUtils.isTimeDistanceLessThanOrEqualTo;
import static org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingTimeUtils.timeDifferenceAsDouble;

public class SwingingDoorTrendingFilter<T> {

private final SwingingDoorTrendingSamplingProcessor processor;
Expand Down Expand Up @@ -85,14 +89,13 @@ public boolean filter(final long timestamp, final T value) {
}

private boolean tryFilter(final long timestamp, final T value) {
final long timeDiff = timestamp - lastStoredTimestamp;
final long absTimeDiff = Math.abs(timeDiff);

if (absTimeDiff <= processor.getCompressionMinTimeInterval()) {
if (isTimeDistanceLessThanOrEqualTo(
timestamp, lastStoredTimestamp, processor.getCompressionMinTimeInterval())) {
return false;
}

if (absTimeDiff >= processor.getCompressionMaxTimeInterval()) {
if (isTimeDistanceGreaterThanOrEqualTo(
timestamp, lastStoredTimestamp, processor.getCompressionMaxTimeInterval())) {
reset(timestamp, value);
return true;
}
Expand All @@ -114,6 +117,7 @@ private boolean tryFilter(final long timestamp, final T value) {
final double doubleValue = Double.parseDouble(value.toString());
final double lastStoredDoubleValue = Double.parseDouble(lastStoredValue.toString());
final double valueDiff = doubleValue - lastStoredDoubleValue;
final double timeDiff = timeDifferenceAsDouble(timestamp, lastStoredTimestamp);

final double currentUpperSlope = (valueDiff - processor.getCompressionDeviation()) / timeDiff;
if (currentUpperSlope > upperDoor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_TUMBLING_TIME_INTERVAL_SECONDS_KEY;
import static org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingTimeUtils.isTimeDistanceGreaterThanOrEqualTo;

@TreeModel
public class TumblingTimeSamplingProcessor extends DownSamplingProcessor {
Expand Down Expand Up @@ -116,7 +117,8 @@ protected void processRow(
final Long lastSampleTime = pathLastObjectCache.getPartialPathLastObject(timeSeriesSuffix);

if (lastSampleTime == null
|| Math.abs(currentRowTime - lastSampleTime) >= intervalInCurrentPrecision) {
|| isTimeDistanceGreaterThanOrEqualTo(
currentRowTime, lastSampleTime, intervalInCurrentPrecision)) {
try {
rowCollector.collectRow(row);

Expand Down
Loading
Loading