From 5ce8f5fbe341710e73cfcc6333048eb7b7dd7c61 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 6 May 2026 04:13:53 +0000 Subject: [PATCH] feat(ha): Add per-tenant configurable failover timeout Add a per-tenant runtime override for the HA tracker failover timeout via the ha_tracker_failover_timeout field in the limits config (flag: -distributor.ha-tracker.failover-timeout-override). When set to a non-zero value for a tenant, it overrides the global -distributor.ha-tracker.failover-timeout. This allows operators to configure different failover timeouts for different tenants based on their HA setup requirements. Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 5 +++++ pkg/ha/ha_tracker.go | 16 +++++++++++++--- pkg/ha/ha_tracker_http.go | 9 ++++++++- pkg/ha/ha_tracker_test.go | 5 +++++ pkg/util/validation/exporter_test.go | 1 + pkg/util/validation/limits.go | 8 ++++++++ schemas/cortex-config-schema.json | 7 +++++++ 8 files changed, 48 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a7640e63d8..bfd739792ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446 +* [CHANGE] HA Tracker: Add per-tenant configurable failover timeout via `-distributor.ha-tracker.failover-timeout-override` runtime config option. When set to a non-zero value, overrides the global `-distributor.ha-tracker.failover-timeout` for that tenant. #7481 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f9c206ac537..988a37e68b1 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4027,6 +4027,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -distributor.ha-tracker.max-clusters [ha_max_clusters: | default = 0] +# Per-tenant failover timeout for the HA tracker. If set to 0, the global +# -distributor.ha-tracker.failover-timeout value is used. +# CLI flag: -distributor.ha-tracker.failover-timeout-override +[ha_tracker_failover_timeout: | default = 0s] + # This flag can be used to specify label names that to drop during sample # ingestion within the distributor and can be repeated in order to drop multiple # labels. diff --git a/pkg/ha/ha_tracker.go b/pkg/ha/ha_tracker.go index d798e07d184..3c36d4d9978 100644 --- a/pkg/ha/ha_tracker.go +++ b/pkg/ha/ha_tracker.go @@ -40,6 +40,10 @@ type HATrackerLimits interface { // MaxHAReplicaGroups returns max number of replica groups that HA tracker should track for a user. // Samples from additional replicaGroups are rejected. MaxHAReplicaGroups(user string) int + + // HATrackerFailoverTimeout returns the failover timeout for a user. + // If 0, the global config value is used. + HATrackerFailoverTimeout(user string) time.Duration } // ProtoReplicaDescFactory makes new InstanceDescs @@ -612,7 +616,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl } } - err := c.checkKVStore(ctx, key, replica, now) + err := c.checkKVStore(ctx, key, replica, userID, now) c.kvCASCalls.WithLabelValues(userID, replicaGroup).Inc() if err != nil { // The callback within checkKVStore will return a ReplicasNotMatchError if the sample is being deduped, @@ -624,7 +628,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl return err } -func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error { +func (c *HATracker) checkKVStore(ctx context.Context, key, replica, userID string, now time.Time) error { return c.client.CAS(ctx, key, func(in any) (out any, retry bool, err error) { if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 { // We don't need to CAS and update the timestamp in the KV store if the timestamp we've received @@ -635,7 +639,13 @@ func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now t // We shouldn't failover to accepting a new replica if the timestamp we've received this sample at // is less than failover timeout amount of time since the timestamp in the KV store. - if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout { + failoverTimeout := c.cfg.FailoverTimeout + if c.limits != nil { + if t := c.limits.HATrackerFailoverTimeout(userID); t > 0 { + failoverTimeout = t + } + } + if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < failoverTimeout { return nil, false, ReplicasNotMatchError{replica: replica, elected: desc.Replica} } } diff --git a/pkg/ha/ha_tracker_http.go b/pkg/ha/ha_tracker_http.go index 14480f89245..6c3ce843296 100644 --- a/pkg/ha/ha_tracker_http.go +++ b/pkg/ha/ha_tracker_http.go @@ -76,13 +76,20 @@ func (h *HATracker) ServeHTTP(w http.ResponseWriter, req *http.Request) { for key, desc := range h.elected { chunks := strings.SplitN(key, "/", 2) + failoverTimeout := h.cfg.FailoverTimeout + if h.limits != nil { + if t := h.limits.HATrackerFailoverTimeout(chunks[0]); t > 0 { + failoverTimeout = t + } + } + electedReplicas = append(electedReplicas, replica{ UserID: chunks[0], Cluster: chunks[1], Replica: desc.Replica, ElectedAt: timestamp.Time(desc.ReceivedAt), UpdateTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.UpdateTimeout)), - FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.FailoverTimeout)), + FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(failoverTimeout)), }) } h.electedLock.RUnlock() diff --git a/pkg/ha/ha_tracker_test.go b/pkg/ha/ha_tracker_test.go index 5376bb91ee8..f67b2758827 100644 --- a/pkg/ha/ha_tracker_test.go +++ b/pkg/ha/ha_tracker_test.go @@ -767,12 +767,17 @@ func TestReplicasNotMatchError(t *testing.T) { type trackerLimits struct { maxReplicaGroups int + failoverTimeout time.Duration } func (l trackerLimits) MaxHAReplicaGroups(_ string) int { return l.maxReplicaGroups } +func (l trackerLimits) HATrackerFailoverTimeout(_ string) time.Duration { + return l.failoverTimeout +} + func TestHATracker_MetricsCleanup(t *testing.T) { t.Parallel() reg := prometheus.NewPedanticRegistry() diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index fa24aa4a4f6..215864c5891 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -59,6 +59,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="enforce_metadata_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="enforce_metric_name",user="tenant-a"} 1 cortex_overrides{limit_name="ha_max_clusters",user="tenant-a"} 0 + cortex_overrides{limit_name="ha_tracker_failover_timeout",user="tenant-a"} 0 cortex_overrides{limit_name="ingestion_burst_size",user="tenant-a"} 50000 cortex_overrides{limit_name="ingestion_rate",user="tenant-a"} 25000 cortex_overrides{limit_name="ingestion_tenant_shard_size",user="tenant-a"} 0 diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a9b92c866c5..358e3565b58 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -138,6 +138,7 @@ type Limits struct { HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"` HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"` HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"` + HATrackerFailoverTimeout model.Duration `yaml:"ha_tracker_failover_timeout" json:"ha_tracker_failover_timeout"` DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"` MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"` MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"` @@ -281,6 +282,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.") + _ = l.HATrackerFailoverTimeout.Set("0s") + f.Var(&l.HATrackerFailoverTimeout, "distributor.ha-tracker.failover-timeout-override", "Per-tenant failover timeout for the HA tracker. If set to 0, the global -distributor.ha-tracker.failover-timeout value is used.") f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.") f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") f.BoolVar(&l.EnableTypeAndUnitLabels, "distributor.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.") @@ -1070,6 +1073,11 @@ func (o *Overrides) MaxHAReplicaGroups(user string) int { return o.GetOverridesForUser(user).HAMaxClusters } +// HATrackerFailoverTimeout returns the per-tenant HA tracker failover timeout. +func (o *Overrides) HATrackerFailoverTimeout(user string) time.Duration { + return time.Duration(o.GetOverridesForUser(user).HATrackerFailoverTimeout) +} + // S3SSEType returns the per-tenant S3 SSE type. func (o *Overrides) S3SSEType(user string) string { return o.GetOverridesForUser(user).S3SSEType diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 50b9a88bb71..69d6f778d38 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5119,6 +5119,13 @@ "type": "string", "x-cli-flag": "distributor.ha-tracker.replica" }, + "ha_tracker_failover_timeout": { + "default": "0s", + "description": "Per-tenant failover timeout for the HA tracker. If set to 0, the global -distributor.ha-tracker.failover-timeout value is used.", + "type": "string", + "x-cli-flag": "distributor.ha-tracker.failover-timeout-override", + "x-format": "duration" + }, "ingestion_burst_size": { "default": 50000, "description": "Per-user allowed ingestion burst size (in number of samples).",