Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ import com.sksamuel.elastic4s.ElasticApi.{
bucketScriptAggregation,
bucketSelectorAggregation,
cardinalityAgg,
extendedStatsAgg,
maxAgg,
minAgg,
nestedAggregation,
percentilesAgg,
sumAgg,
termsAgg,
topHitsAgg,
Expand Down Expand Up @@ -191,6 +193,11 @@ object ElasticAggregation {
case MAX => aggWithFieldOrScript(maxAgg, (name, s) => maxAgg(name, sourceField).script(s))
case AVG => aggWithFieldOrScript(avgAgg, (name, s) => avgAgg(name, sourceField).script(s))
case SUM => aggWithFieldOrScript(sumAgg, (name, s) => sumAgg(name, sourceField).script(s))
case STDDEV | STDDEV_SAMP | STDDEV_POP | VARIANCE | VAR_SAMP | VAR_POP =>
aggWithFieldOrScript(
extendedStatsAgg,
(name, s) => extendedStatsAgg(name, sourceField).script(s)
)
case th: WindowFunction =>
th.window match {
case COUNT =>
Expand All @@ -212,24 +219,60 @@ object ElasticAggregation {
aggWithFieldOrScript(avgAgg, (name, s) => avgAgg(name, sourceField).script(s))
case SUM =>
aggWithFieldOrScript(sumAgg, (name, s) => sumAgg(name, sourceField).script(s))
case STDDEV | STDDEV_SAMP | STDDEV_POP | VARIANCE | VAR_SAMP | VAR_POP =>
aggWithFieldOrScript(
extendedStatsAgg,
(name, s) => extendedStatsAgg(name, sourceField).script(s)
)
case PERCENTILE_CONT | PERCENTILE_DISC =>
// Both map to ES `percentiles` (TDigest). One call → one percent;
// the requested value column is `sourceField` (PercentileAgg.identifier).
val pct: Seq[Double] = th match {
case p: PercentileAgg => Seq(p.percent)
case _ => Seq.empty
}
aggWithFieldOrScript(
(name, field) => percentilesAgg(name, field).percents(pct),
(name, s) => percentilesAgg(name, sourceField).percents(pct).script(s)
)
case _ =>
val isRanking = th.isInstanceOf[RankingWindow]
val limit = {
th match {
case _: LastValue | _: FirstValue => Some(1)
case _ => th.limit.map(_.limit)
// Ranking: top_hits.size driven by the AST's `limit`,
// populated by the inline `LIMIT N` inside OVER (the shipped
// top-N push-down syntax). When absent, default to ES
// `index.max_inner_result_window` (100); push the desired N
// via `LIMIT N` inside OVER for larger partitions. A
// non-positive LIMIT is meaningless for top-N, so it falls
// back to the default cap rather than emitting size:0.
case _: RankingWindow =>
Some(th.limit.map(_.limit).filter(_ > 0).getOrElse(100))
case _ => th.limit.map(_.limit)
}
}
// Ranking emits fetchSource = only the ORDER BY columns (used
// by the in-memory ordinal assigner to detect ties); `_id`
// comes back automatically as hit metadata. The aggregation
// window (LAST_VALUE / FIRST_VALUE / ARRAY_AGG) keeps the
// existing identifier-name-based fetchSource.
val fetchSourceCols: Array[String] =
if (isRanking) {
th.orderBy.toSeq
.flatMap(_.sorts.map(_.field.name))
.distinct
.toArray
} else {
(th.identifier.name +: th.fields
.filterNot(_.isScriptField)
.filterNot(_.sourceField == th.identifier.name)
.map(_.sourceField)
.distinct).toArray
}
val topHits =
topHitsAgg(aggName)
.fetchSource(
th.identifier.name +: th.fields
.filterNot(_.isScriptField)
.filterNot(_.sourceField == th.identifier.name)
.map(_.sourceField)
.distinct
.toArray,
Array.empty
)
.fetchSource(fetchSourceCols, Array.empty)
.copy(
scripts = th.fields
.filter(_.isScriptField)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import app.softnetwork.elastic.sql.`type`.{
SQLVarchar
}
import app.softnetwork.elastic.sql.config.ElasticSqlConfig
import app.softnetwork.elastic.sql.function.aggregate.COUNT
import app.softnetwork.elastic.sql.function.aggregate.{COUNT, PercentileAgg}
import app.softnetwork.elastic.sql.function.geo.{Distance, Meters}
import app.softnetwork.elastic.sql.operator._
import app.softnetwork.elastic.sql.query._
Expand All @@ -39,6 +39,7 @@ import com.sksamuel.elastic4s.requests.searches.aggs.{
AbstractAggregation,
FilterAggregation,
NestedAggregation,
PercentilesAggregation,
TermsAggregation
}
import com.sksamuel.elastic4s.requests.searches.queries.compound.BoolQuery
Expand Down Expand Up @@ -455,6 +456,37 @@ package object bridge {
request.orderBy.map(_.sorts).getOrElse(Seq.empty)
).minScore(request.score)

/** Merge percentile ElasticAggregations that share a value column / `cont` flag / partition into
* the FIRST of them (the owner): set the owner's ES `percentiles` `percents` to the group's
* sorted-distinct union and drop the delegates. `.percents` is called on the owner's existing
* `PercentilesAggregation`, preserving its field/script. Mirrors
* `SearchApi.toClientAggregations` (both call [[PercentileAgg.coalescePlan]] on the same
* SELECT-ordered items, so they pick the same owner). Only percentiles sharing the same
* partition merge, so a merged agg always distributes to one bucket.
*/
private def coalescePercentileAggs(
aggs: Seq[ElasticAggregation]
): Seq[ElasticAggregation] = {
val items = aggs.collect {
case ea if ea.aggType.isInstanceOf[PercentileAgg] =>
ea.aggName -> ea.aggType.asInstanceOf[PercentileAgg]
}
if (items.size < 2) aggs
else {
val plan = PercentileAgg.coalescePlan(items)
aggs.flatMap { ea =>
if (plan.isDelegate(ea.aggName)) None
else if (plan.isOwner(ea.aggName))
Some(
ea.copy(agg =
ea.agg.asInstanceOf[PercentilesAggregation].percents(plan.mergedPercents(ea.aggName))
)
)
else Some(ea)
}
}
}

implicit def requestToSearchRequest(
request: SingleSearch
)(implicit
Expand All @@ -463,12 +495,14 @@ package object bridge {
): SearchRequest = {
import request._

val aggregations = request.aggregates.map(
ElasticAggregation(
_,
request.having.flatMap(_.criteria),
request.sorts,
request.sqlAggregations
val aggregations = coalescePercentileAggs(
request.aggregates.map(
ElasticAggregation(
_,
request.having.flatMap(_.criteria),
request.sorts,
request.sqlAggregations
)
)
)

Expand Down Expand Up @@ -579,10 +613,15 @@ package object bridge {
case _ => scriptSort.asc()
}
} else {
sort.order match {
val baseSort = sort.order match {
case Some(Desc) => FieldSort(sort.field.aliasOrName).desc()
case _ => FieldSort(sort.field.aliasOrName).asc()
}
sort.nullOrdering match {
case Some(NullsFirst) => baseSort.missing("_first")
case Some(NullsLast) => baseSort.missing("_last")
case None => baseSort
}
}
}
case _ => _search
Expand Down
Loading
Loading