Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
458 changes: 450 additions & 8 deletions packaging/src/kubernetes/README.md

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ rules:
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# Scale subresource for operator-driven autoscaling
- apiGroups: ["apps"]
resources: ["deployments/scale", "statefulsets/scale"]
verbs: ["get", "update", "patch"]
# Jobs for schema initialization
- apiGroups: ["batch"]
resources: ["jobs"]
Expand All @@ -46,7 +50,11 @@ rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "patch"]
# Pods: read-only for readiness checking
# Pods: read + patch (patch needed for pod-deletion-cost annotation)
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
verbs: ["get", "list", "watch", "patch"]
# PodDisruptionBudgets for graceful autoscaling
- apiGroups: ["policy"]
resources: ["poddisruptionbudgets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.metastore.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.metastore.autoscaling .Values.cluster.metastore.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.metastore.autoscaling.minReplicas }}
scaleUpThreshold: {{ .Values.cluster.metastore.autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .Values.cluster.metastore.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.metastore.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.metastore.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.metastore.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
cpuScaleUpThreshold: {{ .Values.cluster.metastore.autoscaling.cpuScaleUpThreshold | default 90 }}
cpuScaleDownThreshold: {{ .Values.cluster.metastore.autoscaling.cpuScaleDownThreshold | default 30 }}
{{- end }}
{{- else }}
{{- if .Values.cluster.metastore.externalUri }}
externalUri: {{ .Values.cluster.metastore.externalUri | quote }}
Expand Down Expand Up @@ -96,6 +108,18 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.hiveServer2.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.hiveServer2.autoscaling .Values.cluster.hiveServer2.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.hiveServer2.autoscaling.minReplicas }}
scaleUpThreshold: {{ .Values.cluster.hiveServer2.autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .Values.cluster.hiveServer2.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.hiveServer2.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.hiveServer2.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.hiveServer2.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
cpuScaleUpThreshold: {{ .Values.cluster.hiveServer2.autoscaling.cpuScaleUpThreshold | default 90 }}
cpuScaleDownThreshold: {{ .Values.cluster.hiveServer2.autoscaling.cpuScaleDownThreshold | default 30 }}
{{- end }}

llap:
enabled: {{ .Values.cluster.llap.enabled }}
Expand All @@ -120,6 +144,16 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.llap.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.llap.autoscaling .Values.cluster.llap.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.llap.autoscaling.minReplicas }}
scaleUpThreshold: {{ .Values.cluster.llap.autoscaling.scaleUpThreshold }}
scaleUpStabilizationSeconds: {{ .Values.cluster.llap.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.llap.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.llap.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.llap.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
{{- end }}
{{- end }}

tezAm:
Expand All @@ -146,6 +180,15 @@ spec:
extraVolumeMounts:
{{- toYaml .Values.cluster.tezAm.extraVolumeMounts | nindent 6 }}
{{- end }}
{{- if and .Values.cluster.tezAm.autoscaling .Values.cluster.tezAm.autoscaling.enabled }}
autoscaling:
enabled: true
minReplicas: {{ .Values.cluster.tezAm.autoscaling.minReplicas }}
scaleUpStabilizationSeconds: {{ .Values.cluster.tezAm.autoscaling.scaleUpStabilizationSeconds }}
scaleDownStabilizationSeconds: {{ .Values.cluster.tezAm.autoscaling.scaleDownStabilizationSeconds }}
gracePeriodSeconds: {{ .Values.cluster.tezAm.autoscaling.gracePeriodSeconds }}
metricsScrapeIntervalSeconds: {{ .Values.cluster.tezAm.autoscaling.metricsScrapeIntervalSeconds | default 10 }}
{{- end }}
{{- end }}

zookeeper:
Expand Down
48 changes: 48 additions & 0 deletions packaging/src/kubernetes/helm/hive-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ cluster:
configOverrides: {}
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# The operator scrapes JMX Exporter metrics from pods directly.
# When enabled, 'replicas' above acts as the max replica ceiling.
autoscaling:
enabled: false
minReplicas: 1
scaleUpThreshold: 100
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 300
gracePeriodSeconds: 60
metricsScrapeIntervalSeconds: 10
cpuScaleUpThreshold: 90
cpuScaleDownThreshold: 30
# Set to use an external Metastore instead of deploying one:
# enabled: false
# externalUri: "thrift://external-metastore:9083"
Expand All @@ -127,6 +140,18 @@ cluster:
externalJars: []
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# When enabled, 'replicas' above acts as the max replica ceiling
autoscaling:
enabled: false
minReplicas: 1
scaleUpThreshold: 100
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 600
gracePeriodSeconds: 300
metricsScrapeIntervalSeconds: 10
cpuScaleUpThreshold: 90
cpuScaleDownThreshold: 30

# ---------------------------------------------------------------------------
# LLAP — enabled by default for full-HA
Expand All @@ -141,6 +166,17 @@ cluster:
configOverrides: {}
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# minReplicas: 0 enables scale-to-zero — scales up when HS2 has active sessions
# When enabled, 'replicas' above acts as the max replica ceiling
autoscaling:
enabled: false
minReplicas: 0
scaleUpThreshold: 10
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 900
gracePeriodSeconds: 600
metricsScrapeIntervalSeconds: 10

# ---------------------------------------------------------------------------
# TEZ AM — enabled by default for full-HA
Expand All @@ -154,3 +190,15 @@ cluster:
configOverrides: {}
extraVolumes: []
extraVolumeMounts: []
# Autoscaling (operator-driven, no external dependencies)
# minReplicas: 0 enables scale-to-zero — wakes when HS2 receives queries
# When enabled, 'replicas' above acts as the max replica ceiling
# TezAM scales demand-based: max(totalSessions, hs2Pods * sessionsPerQueue)
# No scaleUpThreshold needed — scaling is 1:1 with session demand
autoscaling:
enabled: false
minReplicas: 0
scaleUpStabilizationSeconds: 60
scaleDownStabilizationSeconds: 600
gracePeriodSeconds: 120
metricsScrapeIntervalSeconds: 10
1 change: 1 addition & 0 deletions packaging/src/kubernetes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
<executable>docker</executable>
<arguments>
<argument>build</argument>
<argument>--no-cache</argument>
<argument>-t</argument>
<argument>apache/hive:operator-${project.version}</argument>
<argument>.</argument>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package org.apache.hive.kubernetes.operator;

import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ResolvedControllerConfiguration;
import org.apache.hive.kubernetes.operator.model.HiveCluster;
import org.apache.hive.kubernetes.operator.reconciler.HiveClusterReconciler;
import org.apache.hive.kubernetes.operator.reconciler.HiveWorkflowSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,7 +40,16 @@ private HiveOperatorMain() {
public static void main(String[] args) {
LOG.info("Starting Hive Kubernetes Operator");
Operator operator = new Operator();
operator.register(new HiveClusterReconciler());
HiveClusterReconciler reconciler = new HiveClusterReconciler();
// Get the annotation-derived base config, then inject our programmatic workflow spec.
ControllerConfiguration<HiveCluster> baseConfig =
operator.getConfigurationService().getConfigurationFor(reconciler);
HiveWorkflowSpec workflowSpec = new HiveWorkflowSpec();
((ResolvedControllerConfiguration<HiveCluster>) baseConfig)
.setWorkflowSpec(workflowSpec);
LOG.info("Registered workflow with {} dependent resource specs",
workflowSpec.getDependentResourceSpecs().size());
operator.register(reconciler, baseConfig);
operator.start();
LOG.info("Hive Kubernetes Operator started successfully");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.kubernetes.operator.autoscaling;

import java.time.Duration;
import java.util.List;

import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Per-component autoscaler state. Owns the scaling strategy,
* stabilization windows.
*/
public class ComponentAutoscaler {

/** Result of an autoscaling evaluation. */
public record EvaluationResult(int rawMetricValue, double cpuPercent,
int cpuProposedReplicas, int proposedReplicas, Integer patchTo) {}


private static final Logger LOG = LoggerFactory.getLogger(ComponentAutoscaler.class);

private static final String METRIC_CPU_LOAD = "jvm_process_cpu_load";

private final String component;
private final ScalingStrategy strategy;
private final boolean cpuScalingApplicable;
private StabilizationWindow scaleUpWindow;
private StabilizationWindow scaleDownWindow;
private int lastScaleUpStabilization = -1;
private int lastScaleDownStabilization = -1;
private boolean initialized;
private double lastCpuPercent;

public ComponentAutoscaler(String component, ScalingStrategy strategy) {
this.component = component;
this.strategy = strategy;
this.cpuScalingApplicable = "hiveserver2".equals(component) || "metastore".equals(component);
}

/** Whether the underlying strategy uses scaleUpThreshold for scaling decisions. */
public boolean usesScaleUpThreshold() {
return strategy.usesScaleUpThreshold();
}

/**
* Evaluate metrics and return the evaluation result containing
* raw metric value, proposed replicas, and the actual patch (null if no change).
*/
public EvaluationResult evaluate(List<PodMetrics> metrics, AutoscalingSpec spec,
int currentReplicas, int maxReplicas) {

ensureWindows(spec);

// On first evaluation, seed the scale-down window with currentReplicas.
// This prevents immediate scale-down after operator restart when the window has no history.
if (!initialized) {
initialized = true;
scaleDownWindow.record(currentReplicas);
LOG.debug("[{}] Initialized scale-down window with currentReplicas={}", component, currentReplicas);
}

int rawDesired = strategy.computeDesiredReplicas(metrics, spec, maxReplicas);
int metricValue = strategy.lastMetricValue();

// CPU-based scaling: combine with metric-based desired via max()
int cpuDesired = computeCpuDesired(metrics, spec, currentReplicas);
int combined = Math.max(rawDesired, cpuDesired);
int clamped = Math.max(spec.minReplicas(), Math.min(combined, maxReplicas));

scaleUpWindow.record(clamped);
scaleDownWindow.record(clamped);

int target;
if (clamped > currentReplicas) {
// Scale up: use stabilized max (highest recommendation in window — don't under-scale)
target = scaleUpWindow.stabilizedMax();
} else if (clamped < currentReplicas) {
// Scale down: use stabilized max (highest/most conservative recommendation in window —
// prevents premature scale-down, matches HPA selectPolicy: Max behavior).
// The stabilization window duration serves as the cooldown between scale-downs.
target = scaleDownWindow.stabilizedMax();
} else {
target = currentReplicas;
}

// Ensure target is still within bounds
target = Math.max(spec.minReplicas(), Math.min(target, maxReplicas));

if (target == currentReplicas) {
return new EvaluationResult(metricValue, lastCpuPercent, cpuDesired, clamped, null);
}

if (target < currentReplicas) {
LOG.info("[{}] Scaling down: {} -> {}", component, currentReplicas, target);
} else {
LOG.info("[{}] Scaling up: {} -> {}", component, currentReplicas, target);
}
return new EvaluationResult(metricValue, lastCpuPercent, cpuDesired, clamped, target);
}

/**
* Compute desired replicas based on CPU utilization.
* Returns 0 if CPU scaling is not applicable or no CPU data is available.
*/
private int computeCpuDesired(List<PodMetrics> metrics, AutoscalingSpec spec, int currentReplicas) {
if (!cpuScalingApplicable || spec.cpuScaleUpThreshold() <= 0 || metrics.isEmpty()) {
lastCpuPercent = 0;
return 0;
}

double totalCpu = 0;
int count = 0;
for (PodMetrics pm : metrics) {
Double cpu = pm.metrics().get(METRIC_CPU_LOAD);
if (cpu != null) {
totalCpu += cpu * 100.0;
count++;
}
}
if (count == 0) {
lastCpuPercent = 0;
return 0;
}
double avgCpuPercent = totalCpu / count;
lastCpuPercent = avgCpuPercent;
LOG.debug("[{}] CPU raw: totalCpu={}, count={}, avg={}%", component, totalCpu, count, avgCpuPercent);

if (avgCpuPercent >= spec.cpuScaleUpThreshold()) {
// Scale up proportionally: how many pods to bring avg below threshold
return (int) Math.ceil(avgCpuPercent * currentReplicas / spec.cpuScaleUpThreshold());
} else if (avgCpuPercent < spec.cpuScaleDownThreshold()) {
// Scale down: current load could fit in fewer pods
int desired = (int) Math.ceil(avgCpuPercent * currentReplicas / spec.cpuScaleUpThreshold());
return Math.max(desired, spec.minReplicas());
}
// Between thresholds: hold current
return currentReplicas;
}

private void ensureWindows(AutoscalingSpec spec) {
if (scaleUpWindow == null || lastScaleUpStabilization != spec.scaleUpStabilizationSeconds()) {
scaleUpWindow = new StabilizationWindow(
Duration.ofSeconds(spec.scaleUpStabilizationSeconds()));
lastScaleUpStabilization = spec.scaleUpStabilizationSeconds();
}
if (scaleDownWindow == null || lastScaleDownStabilization != spec.scaleDownStabilizationSeconds()) {
scaleDownWindow = new StabilizationWindow(
Duration.ofSeconds(spec.scaleDownStabilizationSeconds()));
lastScaleDownStabilization = spec.scaleDownStabilizationSeconds();
}
}
}
Loading
Loading