From 9b57f5299635e4883e19c8325ef96e9502415095 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 4 May 2026 09:01:43 +0000 Subject: [PATCH] feat(ingester): Add active series tracker for pattern-based monitoring Add a new active series tracker feature that counts active series by configurable label matchers (including regex) and exposes the counts as Prometheus metrics. This is designed for internal monitoring without enforcing any limits. Key changes: - Add ActiveSeriesTrackersConfig type in validation package with PromQL matcher syntax support - Add ActiveSeriesTrackers field to Limits struct for per-tenant config with default fallback - Add ActiveForMatchers() method to ActiveSeries for counting matching series across all stripes - Add cortex_ingester_active_series_per_tracker gauge metric - Integrate into updateActiveSeries() periodic tick - Matchers are validated and compiled during config unmarshalling - Runtime hot-reloadable via existing runtime config overrides Configuration example: overrides: tenant-123: active_series_trackers: - name: api_metrics matchers: '{__name__=~"api_.*"}' Metric emitted: cortex_ingester_active_series_per_tracker{user="tenant", name="api_metrics"} 42 Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 15 ++ docs/configuration/v1-guarantees.md | 3 + docs/proposals/active-series-tracker.md | 71 ++++++ integration/active_series_tracker_test.go | 160 ++++++++++++++ pkg/ingester/active_series.go | 10 + pkg/ingester/active_series_tracker_test.go | 206 ++++++++++++++++++ pkg/ingester/ingester.go | 11 + pkg/ingester/metrics.go | 9 + pkg/ingester/tracker_counter.go | 133 +++++++++++ pkg/util/validation/active_series_tracker.go | 61 ++++++ .../validation/active_series_tracker_test.go | 112 ++++++++++ pkg/util/validation/limits.go | 30 ++- schemas/cortex-config-schema.json | 21 ++ 14 files changed, 835 insertions(+), 8 deletions(-) create mode 100644 docs/proposals/active-series-tracker.md create mode 100644 integration/active_series_tracker_test.go create mode 100644 pkg/ingester/active_series_tracker_test.go create mode 100644 pkg/ingester/tracker_counter.go create mode 100644 pkg/util/validation/active_series_tracker.go create mode 100644 pkg/util/validation/active_series_tracker_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c4505a58558..d081d9ca6fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446 +* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2091fb52ff9..c521bccfcc6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4153,6 +4153,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # [max_series] [limits_per_label_set: | default = []] +# List of active series tracker configurations. Each tracker counts active +# series matching its matchers and exposes the count as a metric. +[active_series_trackers: | default = []] + # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] @@ -6864,6 +6868,17 @@ limits: [label_set: | default = []] ``` +### `ActiveSeriesTrackerConfig` + +```yaml +# Name of the tracker, used as a label value in the emitted metric. +[name: | default = ""] + +# PromQL series selector (e.g. {__name__=~"api_.*"}). All matchers must match +# for a series to be counted. +[matchers: | default = ""] +``` + ### `PriorityDef` ```yaml diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index be3ee78ce59..092d0d18f9b 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -129,3 +129,6 @@ Currently experimental features are: - `-validation.max-label-cardinality-for-unoptimized-regex` (int) - maximum label cardinality - `-validation.max-total-label-value-length-for-unoptimized-regex` (int) - maximum total length of all label values in bytes - HATracker: `-distributor.ha-tracker.enable-startup-sync` (bool) - If enabled, fetches all tracked keys on startup to populate the local cache. +- Ingester: Active Series Tracker + - Per-tenant `active_series_trackers` configuration in runtime config overrides + - Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric diff --git a/docs/proposals/active-series-tracker.md b/docs/proposals/active-series-tracker.md new file mode 100644 index 00000000000..b549ee5b336 --- /dev/null +++ b/docs/proposals/active-series-tracker.md @@ -0,0 +1,71 @@ +# Active Series Tracker + +## Problem + +AMP needs to monitor active series counts by configurable patterns (e.g., all series with `__name__=~"api_.*"`) for internal observability. The existing `LimitsPerLabelSet` feature is unsuitable because: + +1. **No regex matching** — only supports exact `label=value` matching. +2. **Default partition side-effects** — adding labelset buckets reduces the default partition count. +3. **Coupled to limit enforcement** — designed for enforcing series limits, not pure monitoring. + +## Requirements + +- Track active series counts by configurable label matchers (including regex). +- Expose counts as Prometheus metrics on the ingester (internal only, not vended to customers). +- Configuration supports **per-tenant overrides** with a **default** fallback (same pattern as all other Limits fields). +- **Runtime hot-reloadable** via the existing runtime config file mechanism. +- **No limit enforcement** — purely observational. +- **No default partition** — unmatched series are simply not tracked. +- A series can match multiple tracker entries simultaneously. + +## Design + +### Configuration + +Tracker config lives in the `Limits` struct, following the same per-tenant override pattern as `LimitsPerLabelSet`: + +```yaml +# Default trackers (applied to all tenants without overrides) +limits: + active_series_trackers: + - name: api_metrics + matchers: '{__name__=~"api_.*"}' + +# Per-tenant overrides via runtime config +overrides: + tenant-123: + active_series_trackers: + - name: api_metrics + matchers: '{__name__=~"api_.*"}' + - name: system_metrics + matchers: '{__name__=~"node_.*|process_.*"}' +``` + +The `matchers` field uses standard PromQL matcher syntax parsed via `parser.ParseMetricSelector`. + +### Runtime Reload + +Tracker config is part of `Limits`, which is reloaded via the runtime config manager every `runtime-config.reload-period` (default 10s). Matchers are parsed and validated during YAML/JSON unmarshalling. Invalid configs are rejected (existing config stays active). + +### Metrics + +A new gauge metric emitted per ingester: + +``` +cortex_ingester_active_series_per_tracker{user="", name=""} +``` + +### Matching Logic + +On each active series metrics update tick (default 1min), for each tenant: +1. Read the tenant's tracker config via `i.limits.ActiveSeriesTrackers(userID)` +2. For each tracker, count active series whose labels satisfy all matchers +3. Emit the gauge metric + +A series can match multiple trackers. Tenants without configured trackers emit no tracker metrics. + +### Performance Considerations + +- Matching runs once per update period (default 1min), not on every sample ingestion. +- The number of trackers is expected to be small (< 10). +- Compiled matchers are cached in the parsed Limits and only recompiled on config change. diff --git a/integration/active_series_tracker_test.go b/integration/active_series_tracker_test.go new file mode 100644 index 00000000000..90689cf2178 --- /dev/null +++ b/integration/active_series_tracker_test.go @@ -0,0 +1,160 @@ +//go:build requires_docker + +package integration + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestActiveSeriesTrackerPerTenant(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Write runtime config with per-tenant active series trackers. + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{ + "user-1": map[string]interface{}{ + "active_series_trackers": []map[string]string{ + {"name": "api_metrics", "matchers": `{__name__=~"api_.*"}`}, + {"name": "node_metrics", "matchers": `{__name__=~"node_.*"}`}, + }, + }, + }, + } + runtimeCfgYAML, err := yaml.Marshal(runtimeConfig) + require.NoError(t, err) + require.NoError(t, writeFileToSharedDir(s, runtimeConfigFile, runtimeCfgYAML)) + + flags := BlocksStorageFlags() + flags["-distributor.shard-by-all-labels"] = "true" + flags["-ingester.active-series-metrics-enabled"] = "true" + flags["-ingester.active-series-metrics-update-period"] = "2s" + flags["-ingester.active-series-metrics-idle-timeout"] = "5m" + flags["-runtime-config.file"] = filepath.Join(e2e.ContainerSharedDir, runtimeConfigFile) + flags["-runtime-config.reload-period"] = "1s" + flags["-alertmanager.web.external-url"] = "http://localhost/alertmanager" + flags["-alertmanager-storage.backend"] = "local" + flags["-alertmanager-storage.local.path"] = filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs") + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + flags["-ring.store"] = "consul" + flags["-consul.hostname"] = consul.NetworkHTTPEndpoint() + + cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until the ring is ready. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + for _, name := range []string{"api_requests_total", "api_errors_total", "node_cpu_seconds", "process_memory_bytes"} { + series, _ := generateSeries(name, now, prompb.Label{Name: "job", Value: "test"}) + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode, fmt.Sprintf("push %s failed", name)) + } + + // user-1 has trackers: api_metrics (matches 2), node_metrics (matches 1). + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(2), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "api_metrics"), + ), + e2e.WaitMissingMetrics, + )) + + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "node_metrics"), + ), + e2e.WaitMissingMetrics, + )) + + // user-2 has no trackers configured — should have no tracker metrics. + c2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-2") + require.NoError(t, err) + + series2, _ := generateSeries("api_requests_total", now, prompb.Label{Name: "job", Value: "test"}) + res, err := c2.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait for user-2 active series to be counted. + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"cortex_ingester_active_series"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-2")), + e2e.WaitMissingMetrics, + )) + + // user-2 should have no tracker metrics. + sum, err := cortex.SumMetrics( + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-2")), + e2e.SkipMissingMetrics, + ) + require.NoError(t, err) + require.Equal(t, 0.0, sum[0]) + + // Now update runtime config: remove node_metrics tracker for user-1. + runtimeConfig2 := map[string]interface{}{ + "overrides": map[string]interface{}{ + "user-1": map[string]interface{}{ + "active_series_trackers": []map[string]string{ + {"name": "api_metrics", "matchers": `{__name__=~"api_.*"}`}, + }, + }, + }, + } + runtimeCfgYAML2, err := yaml.Marshal(runtimeConfig2) + require.NoError(t, err) + require.NoError(t, writeFileToSharedDir(s, runtimeConfigFile, runtimeCfgYAML2)) + + // Wait for the stale node_metrics tracker metric to be removed. + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(0), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "node_metrics"), + ), + e2e.SkipMissingMetrics, + )) + + // api_metrics tracker should still work. + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(2), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "api_metrics"), + ), + e2e.WaitMissingMetrics, + )) +} diff --git a/pkg/ingester/active_series.go b/pkg/ingester/active_series.go index 57134a03ca6..1a685a36cfa 100644 --- a/pkg/ingester/active_series.go +++ b/pkg/ingester/active_series.go @@ -248,3 +248,13 @@ func (s *activeSeriesStripe) getActiveNativeHistogram() int { return s.activeNativeHistogram } + +// matchesAll returns true if the labels satisfy all given matchers. +func matchesAll(lbs labels.Labels, matchers []*labels.Matcher) bool { + for _, m := range matchers { + if !m.Matches(lbs.Get(m.Name)) { + return false + } + } + return true +} diff --git a/pkg/ingester/active_series_tracker_test.go b/pkg/ingester/active_series_tracker_test.go new file mode 100644 index 00000000000..adcd2a03309 --- /dev/null +++ b/pkg/ingester/active_series_tracker_test.go @@ -0,0 +1,206 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func TestActiveSeriesTrackerConfig_Validate(t *testing.T) { + tests := []struct { + name string + cfg validation.ActiveSeriesTrackerConfig + expectErr bool + }{ + {name: "valid exact", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__="api_requests_total"}`}}, + {name: "valid regex", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"api_.*"}`}}, + {name: "valid multiple", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"api_.*", job="gateway"}`}}, + {name: "invalid regex", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"[invalid"}`}, expectErr: true}, + {name: "empty matchers", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: ``}, expectErr: true}, + {name: "empty name", cfg: validation.ActiveSeriesTrackerConfig{Name: "", Matchers: `{__name__="foo"}`}, expectErr: true}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + if tc.expectErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.NotEmpty(t, tc.cfg.ParsedMatchers()) + } + }) + } +} + +func TestActiveSeriesTrackersConfig_Validate(t *testing.T) { + t.Run("valid", func(t *testing.T) { + cfg := validation.ActiveSeriesTrackersConfig{ + {Name: "a", Matchers: `{__name__="foo"}`}, + {Name: "b", Matchers: `{__name__=~"bar.*"}`}, + } + require.NoError(t, cfg.Validate()) + }) + + t.Run("duplicate names", func(t *testing.T) { + cfg := validation.ActiveSeriesTrackersConfig{ + {Name: "dup", Matchers: `{__name__="foo"}`}, + {Name: "dup", Matchers: `{__name__="bar"}`}, + } + assert.Error(t, cfg.Validate()) + }) + + t.Run("nil is valid", func(t *testing.T) { + var cfg validation.ActiveSeriesTrackersConfig + require.NoError(t, cfg.Validate()) + }) +} + +func TestTrackerCounter_IncreaseDecrease(t *testing.T) { + tc := newTrackerCounter() + + trackers := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + {Name: "gateway", Matchers: `{job="gateway"}`}, + } + require.NoError(t, trackers.Validate()) + + // Simulate config load. + tc.mu.Lock() + tc.matchers = map[string][]*labels.Matcher{ + "api": trackers[0].ParsedMatchers(), + "gateway": trackers[1].ParsedMatchers(), + } + tc.mu.Unlock() + + // Add series. + tc.increase(labels.FromStrings("__name__", "api_requests_total", "job", "gateway")) + tc.increase(labels.FromStrings("__name__", "api_errors_total", "job", "gateway")) + tc.increase(labels.FromStrings("__name__", "node_cpu_seconds", "job", "node")) + + tc.mu.Lock() + assert.Equal(t, 2, tc.counts["api"]) + assert.Equal(t, 2, tc.counts["gateway"]) + tc.mu.Unlock() + + // Remove one series. + tc.decrease(labels.FromStrings("__name__", "api_requests_total", "job", "gateway")) + + tc.mu.Lock() + assert.Equal(t, 1, tc.counts["api"]) + assert.Equal(t, 1, tc.counts["gateway"]) + tc.mu.Unlock() +} + +func TestTrackerCounter_UpdateMetrics(t *testing.T) { + tc := newTrackerCounter() + gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "test"}, []string{"user", "name"}) + + trackers := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + {Name: "node", Matchers: `{__name__=~"node_.*"}`}, + } + require.NoError(t, trackers.Validate()) + + tc.mu.Lock() + tc.matchers = map[string][]*labels.Matcher{ + "api": trackers[0].ParsedMatchers(), + "node": trackers[1].ParsedMatchers(), + } + tc.counts = map[string]int{"api": 5, "node": 3} + tc.mu.Unlock() + + tc.updateMetrics(gauge, "user-1", trackers) + + m := &dto.Metric{} + require.NoError(t, gauge.WithLabelValues("user-1", "api").Write(m)) + assert.Equal(t, float64(5), m.GetGauge().GetValue()) + require.NoError(t, gauge.WithLabelValues("user-1", "node").Write(m)) + assert.Equal(t, float64(3), m.GetGauge().GetValue()) + + // Remove "node" tracker — stale metric should be cleaned up. + trackers2 := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + } + require.NoError(t, trackers2.Validate()) + tc.updateMetrics(gauge, "user-1", trackers2) + + // "node" metric deleted (re-creating gives 0). + require.NoError(t, gauge.WithLabelValues("user-1", "node").Write(m)) + assert.Equal(t, float64(0), m.GetGauge().GetValue()) + // "api" still reported. + require.NoError(t, gauge.WithLabelValues("user-1", "api").Write(m)) + assert.Equal(t, float64(5), m.GetGauge().GetValue()) +} + +func TestTrackerCounter_UpdateConfig_Backfill(t *testing.T) { + tc := newTrackerCounter() + + // Without a DB, just sets matchers. + trackers := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + } + require.NoError(t, trackers.Validate()) + tc.updateConfig(context.Background(), nil, trackers) + + tc.mu.Lock() + assert.NotNil(t, tc.matchers["api"]) + assert.Equal(t, 0, tc.counts["api"]) + tc.mu.Unlock() +} + +func TestMatchesAll(t *testing.T) { + lbs := labels.FromStrings("__name__", "http_requests_total", "method", "GET", "status", "200") + + tests := []struct { + name string + matchers []*labels.Matcher + expected bool + }{ + { + name: "all match", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests_total"), labels.MustNewMatcher(labels.MatchEqual, "method", "GET")}, + expected: true, + }, + { + name: "one doesn't match", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests_total"), labels.MustNewMatcher(labels.MatchEqual, "method", "POST")}, + expected: false, + }, + { + name: "regex match", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "__name__", "http_.*")}, + expected: true, + }, + { + name: "empty matchers matches everything", + matchers: nil, + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, matchesAll(lbs, tc.matchers)) + }) + } +} + +// Ensure ActiveSeries still works (we kept matchesAll but removed ActiveForMatchers). +func TestActiveSeries_Basic(t *testing.T) { + as := NewActiveSeries() + now := time.Now() + + s := labels.FromStrings("__name__", "test", "job", "app") + as.UpdateSeries(s, s.Hash(), now, false, func(l labels.Labels) labels.Labels { return l.Copy() }) + + assert.Equal(t, 1, as.Active()) +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cffb9ca332b..61a7f73d895 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -404,6 +404,9 @@ type userTSDB struct { blockRetentionPeriod int64 postingCache cortex_tsdb.ExpandedPostingsCache + + // Tracks active series per configured tracker pattern. + trackerCounter *trackerCounter } // Explicitly wrapping the tsdb.DB functions that we use. @@ -545,6 +548,7 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { } u.seriesInMetric.increaseSeriesForMetric(metricName) u.labelSetCounter.increaseSeriesLabelSet(u, metric) + u.trackerCounter.increase(metric) if u.postingCache != nil { u.postingCache.ExpireSeries(metric) @@ -563,6 +567,7 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) } u.seriesInMetric.decreaseSeriesForMetric(metricName) u.labelSetCounter.decreaseSeriesLabelSet(u, metric) + u.trackerCounter.decrease(metric) if u.postingCache != nil { u.postingCache.ExpireSeries(metric) } @@ -1146,6 +1151,11 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics); err != nil { level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err) } + + // Per-tenant active series trackers (hot-reloadable via runtime config overrides). + trackers := i.limits.ActiveSeriesTrackers(userID) + userDB.trackerCounter.updateConfig(ctx, userDB.db, trackers) + userDB.trackerCounter.updateMetrics(i.metrics.activeSeriesPerTracker, userID, trackers) } } @@ -2868,6 +2878,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { activeQueriedSeries: activeQueriedSeries, seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), labelSetCounter: newLabelSetCounter(i.limiter), + trackerCounter: newTrackerCounter(), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 1487696fce3..238c578e656 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -60,6 +60,7 @@ type ingesterMetrics struct { activeQueriedSeriesPerUser *prometheus.GaugeVec limitsPerLabelSet *prometheus.GaugeVec usagePerLabelSet *prometheus.GaugeVec + activeSeriesPerTracker *prometheus.GaugeVec // Global limit metrics maxUsersGauge prometheus.GaugeFunc @@ -297,6 +298,12 @@ func newIngesterMetrics(r prometheus.Registerer, Help: "Number of currently active native histogram series per user.", }, []string{"user"}), + // Not registered automatically, but only if activeSeriesEnabled is true. + activeSeriesPerTracker: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_series_per_tracker", + Help: "Number of currently active series matching a configured tracker pattern.", + }, []string{"user", "name"}), + // Not registered automatically, but only if activeQueriedSeriesEnabled is true. activeQueriedSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_ingester_active_queried_series", @@ -342,6 +349,7 @@ func newIngesterMetrics(r prometheus.Registerer, if activeSeriesEnabled && r != nil { r.MustRegister(m.activeSeriesPerUser) r.MustRegister(m.activeNHSeriesPerUser) + r.MustRegister(m.activeSeriesPerTracker) } if activeQueriedSeriesEnabled && r != nil { @@ -372,6 +380,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.memMetadataRemovedTotal.DeleteLabelValues(userID) m.activeSeriesPerUser.DeleteLabelValues(userID) m.activeNHSeriesPerUser.DeleteLabelValues(userID) + m.activeSeriesPerTracker.DeletePartialMatch(prometheus.Labels{"user": userID}) m.activeQueriedSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) diff --git a/pkg/ingester/tracker_counter.go b/pkg/ingester/tracker_counter.go new file mode 100644 index 00000000000..396a3f9a7e5 --- /dev/null +++ b/pkg/ingester/tracker_counter.go @@ -0,0 +1,133 @@ +package ingester + +import ( + "context" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// trackerCounter counts active series per configured tracker pattern. +// It increments on series creation and decrements on series deletion, +// similar to labelSetCounter but using PromQL matchers. +type trackerCounter struct { + mu sync.Mutex + counts map[string]int // tracker name -> count + matchers map[string][]*labels.Matcher // tracker name -> compiled matchers +} + +func newTrackerCounter() *trackerCounter { + return &trackerCounter{ + counts: make(map[string]int), + matchers: make(map[string][]*labels.Matcher), + } +} + +// updateConfig applies new tracker configuration. If trackers changed, backfills +// counts from the TSDB head index. +func (tc *trackerCounter) updateConfig(ctx context.Context, db *tsdb.DB, trackers validation.ActiveSeriesTrackersConfig) { + tc.mu.Lock() + defer tc.mu.Unlock() + + newMatchers := make(map[string][]*labels.Matcher, len(trackers)) + for i := range trackers { + newMatchers[trackers[i].Name] = trackers[i].ParsedMatchers() + } + + if trackerMatchersEqual(tc.matchers, newMatchers) { + return + } + + // Config changed — backfill counts from TSDB head. + tc.matchers = newMatchers + tc.counts = make(map[string]int, len(newMatchers)) + + if db == nil || len(newMatchers) == 0 { + return + } + + ir, err := db.Head().Index() + if err != nil { + return + } + defer ir.Close() + + // Use PostingsForMatchers per tracker for efficient index-based counting. + for name, m := range tc.matchers { + p, err := tsdb.PostingsForMatchers(ctx, ir, m...) + if err != nil { + continue + } + count := 0 + for p.Next() { + count++ + } + tc.counts[name] = count + } +} + +// increase is called on PostCreation for a new series. +func (tc *trackerCounter) increase(metric labels.Labels) { + tc.mu.Lock() + defer tc.mu.Unlock() + + for name, m := range tc.matchers { + if matchesAll(metric, m) { + tc.counts[name]++ + } + } +} + +// decrease is called on PostDeletion for a removed series. +func (tc *trackerCounter) decrease(metric labels.Labels) { + tc.mu.Lock() + defer tc.mu.Unlock() + + for name, m := range tc.matchers { + if matchesAll(metric, m) { + tc.counts[name]-- + } + } +} + +// updateMetrics reports current counts and cleans up stale tracker metrics. +func (tc *trackerCounter) updateMetrics(gauge *prometheus.GaugeVec, userID string, trackers validation.ActiveSeriesTrackersConfig) { + tc.mu.Lock() + defer tc.mu.Unlock() + + activeNames := make(map[string]struct{}, len(trackers)) + for _, t := range trackers { + gauge.WithLabelValues(userID, t.Name).Set(float64(tc.counts[t.Name])) + activeNames[t.Name] = struct{}{} + } + + // Delete metrics for trackers no longer in config. + for name := range tc.counts { + if _, ok := activeNames[name]; !ok { + gauge.DeleteLabelValues(userID, name) + delete(tc.counts, name) + } + } +} + +func trackerMatchersEqual(a, b map[string][]*labels.Matcher) bool { + if len(a) != len(b) { + return false + } + for name, am := range a { + bm, ok := b[name] + if !ok || len(am) != len(bm) { + return false + } + for i := range am { + if am[i].String() != bm[i].String() { + return false + } + } + } + return true +} diff --git a/pkg/util/validation/active_series_tracker.go b/pkg/util/validation/active_series_tracker.go new file mode 100644 index 00000000000..4a95b790671 --- /dev/null +++ b/pkg/util/validation/active_series_tracker.go @@ -0,0 +1,61 @@ +package validation + +import ( + "errors" + "fmt" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" +) + +var ( + errActiveSeriesTrackerEmptyName = errors.New("active series tracker name must not be empty") + errActiveSeriesTrackerDuplicateName = errors.New("duplicate active series tracker name") +) + +// ActiveSeriesTrackerConfig defines a single tracker entry that counts active series +// matching a set of label matchers. This is used for internal monitoring only — +// no limits are enforced. +type ActiveSeriesTrackerConfig struct { + Name string `yaml:"name" json:"name" doc:"nocli|description=Name of the tracker, used as a label value in the emitted metric."` + Matchers string `yaml:"matchers" json:"matchers" doc:"nocli|description=PromQL series selector (e.g. {__name__=~\"api_.*\"}). All matchers must match for a series to be counted."` + + // Parsed matchers, populated during validation. + parsedMatchers []*labels.Matcher `yaml:"-" json:"-" doc:"nocli"` +} + +// ParsedMatchers returns the compiled matchers. Must call Validate() first. +func (c *ActiveSeriesTrackerConfig) ParsedMatchers() []*labels.Matcher { + return c.parsedMatchers +} + +// Validate parses the matchers string into compiled label matchers. +func (c *ActiveSeriesTrackerConfig) Validate() error { + if c.Name == "" { + return errActiveSeriesTrackerEmptyName + } + matchers, err := parser.ParseMetricSelector(c.Matchers) + if err != nil { + return fmt.Errorf("active series tracker %q: %w", c.Name, err) + } + c.parsedMatchers = matchers + return nil +} + +// ActiveSeriesTrackersConfig is a list of tracker configurations. +type ActiveSeriesTrackersConfig []ActiveSeriesTrackerConfig + +// Validate parses and validates all tracker entries, ensuring names are unique and non-empty. +func (c ActiveSeriesTrackersConfig) Validate() error { + names := make(map[string]struct{}, len(c)) + for i := range c { + if err := c[i].Validate(); err != nil { + return err + } + if _, exists := names[c[i].Name]; exists { + return fmt.Errorf("%w: %q", errActiveSeriesTrackerDuplicateName, c[i].Name) + } + names[c[i].Name] = struct{}{} + } + return nil +} diff --git a/pkg/util/validation/active_series_tracker_test.go b/pkg/util/validation/active_series_tracker_test.go new file mode 100644 index 00000000000..818b51a62d8 --- /dev/null +++ b/pkg/util/validation/active_series_tracker_test.go @@ -0,0 +1,112 @@ +package validation + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestActiveSeriesTrackerConfig_Validate(t *testing.T) { + t.Run("valid regex", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"api_.*"}`} + require.NoError(t, cfg.Validate()) + assert.Len(t, cfg.ParsedMatchers(), 1) + }) + + t.Run("invalid regex", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"[bad"}`} + assert.Error(t, cfg.Validate()) + }) + + t.Run("empty matchers", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "test", Matchers: ``} + assert.Error(t, cfg.Validate()) + }) + + t.Run("empty name", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "", Matchers: `{__name__="foo"}`} + assert.ErrorIs(t, cfg.Validate(), errActiveSeriesTrackerEmptyName) + }) +} + +func TestActiveSeriesTrackersConfig_Validate(t *testing.T) { + t.Run("all valid", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "a", Matchers: `{__name__="foo"}`}, + {Name: "b", Matchers: `{__name__=~"bar.*"}`}, + } + require.NoError(t, cfg.Validate()) + }) + + t.Run("one invalid fails all", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "a", Matchers: `{__name__="foo"}`}, + {Name: "bad", Matchers: `{__name__=~"[bad"}`}, + } + assert.Error(t, cfg.Validate()) + }) + + t.Run("nil is valid", func(t *testing.T) { + var cfg ActiveSeriesTrackersConfig + require.NoError(t, cfg.Validate()) + }) + + t.Run("duplicate names", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "dup", Matchers: `{__name__="foo"}`}, + {Name: "dup", Matchers: `{__name__="bar"}`}, + } + assert.ErrorIs(t, cfg.Validate(), errActiveSeriesTrackerDuplicateName) + }) + + t.Run("empty name in list", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "", Matchers: `{__name__="foo"}`}, + } + assert.ErrorIs(t, cfg.Validate(), errActiveSeriesTrackerEmptyName) + }) +} + +func TestOverrides_ActiveSeriesTrackers_PerTenant(t *testing.T) { + defaults := Limits{} + defaults.ActiveSeriesTrackers = ActiveSeriesTrackersConfig{ + {Name: "default_tracker", Matchers: `{__name__=~".*"}`}, + } + require.NoError(t, defaults.ActiveSeriesTrackers.Validate()) + + SetDefaultLimitsForYAMLUnmarshalling(defaults) + + tenantTrackers := ActiveSeriesTrackersConfig{ + {Name: "tenant_api", Matchers: `{__name__=~"api_.*"}`}, + {Name: "tenant_node", Matchers: `{__name__=~"node_.*"}`}, + } + require.NoError(t, tenantTrackers.Validate()) + + tenantLimits := &Limits{} + *tenantLimits = defaults + tenantLimits.ActiveSeriesTrackers = tenantTrackers + + overrides := NewOverrides(defaults, newMockTenantLimits(map[string]*Limits{ + "tenant-with-override": tenantLimits, + })) + + // Tenant with override gets tenant-specific trackers. + trackers := overrides.ActiveSeriesTrackers("tenant-with-override") + require.Len(t, trackers, 2) + assert.Equal(t, "tenant_api", trackers[0].Name) + assert.Equal(t, "tenant_node", trackers[1].Name) + + // Tenant without override gets default trackers. + trackers = overrides.ActiveSeriesTrackers("tenant-without-override") + require.Len(t, trackers, 1) + assert.Equal(t, "default_tracker", trackers[0].Name) +} + +func TestOverrides_ActiveSeriesTrackers_NoDefaults(t *testing.T) { + defaults := Limits{} + overrides := NewOverrides(defaults, nil) + + trackers := overrides.ActiveSeriesTrackers("any-tenant") + assert.Empty(t, trackers) +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a9b92c866c5..8765f4c5c7b 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -159,14 +159,15 @@ type Limits struct { // Ingester enforced limits. // Series - MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` - MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` - MaxLocalNativeHistogramSeriesPerUser int `yaml:"max_native_histogram_series_per_user" json:"max_native_histogram_series_per_user"` - MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` - MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` - MaxGlobalNativeHistogramSeriesPerUser int `yaml:"max_global_native_histogram_series_per_user" json:"max_global_native_histogram_series_per_user"` - LimitsPerLabelSet []LimitsPerLabelSet `yaml:"limits_per_label_set" json:"limits_per_label_set" doc:"nocli|description=[Experimental] Enable limits per LabelSet. Supported limits per labelSet: [max_series]"` - EnableNativeHistograms bool `yaml:"enable_native_histograms" json:"enable_native_histograms"` + MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` + MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` + MaxLocalNativeHistogramSeriesPerUser int `yaml:"max_native_histogram_series_per_user" json:"max_native_histogram_series_per_user"` + MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` + MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` + MaxGlobalNativeHistogramSeriesPerUser int `yaml:"max_global_native_histogram_series_per_user" json:"max_global_native_histogram_series_per_user"` + LimitsPerLabelSet []LimitsPerLabelSet `yaml:"limits_per_label_set" json:"limits_per_label_set" doc:"nocli|description=[Experimental] Enable limits per LabelSet. Supported limits per labelSet: [max_series]"` + ActiveSeriesTrackers ActiveSeriesTrackersConfig `yaml:"active_series_trackers,omitempty" json:"active_series_trackers,omitempty" doc:"nocli|description=List of active series tracker configurations. Each tracker counts active series matching its matchers and exposes the count as a metric."` + EnableNativeHistograms bool `yaml:"enable_native_histograms" json:"enable_native_histograms"` // Regex matcher query limits. MaxRegexPatternLength int `yaml:"max_regex_pattern_length" json:"max_regex_pattern_length"` @@ -494,6 +495,10 @@ func (l *Limits) UnmarshalYAML(unmarshal func(any) error) error { return err } + if err := l.ActiveSeriesTrackers.Validate(); err != nil { + return err + } + return nil } @@ -524,6 +529,10 @@ func (l *Limits) UnmarshalJSON(data []byte) error { return err } + if err := l.ActiveSeriesTrackers.Validate(); err != nil { + return err + } + return nil } @@ -824,6 +833,11 @@ func (o *Overrides) LimitsPerLabelSet(userID string) []LimitsPerLabelSet { return o.GetOverridesForUser(userID).LimitsPerLabelSet } +// ActiveSeriesTrackers returns the active series tracker configurations for a given user. +func (o *Overrides) ActiveSeriesTrackers(userID string) ActiveSeriesTrackersConfig { + return o.GetOverridesForUser(userID).ActiveSeriesTrackers +} + // MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching // chunks from the long-term storage. func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 2521b40aca8..d21c1481659 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -2,6 +2,19 @@ "$id": "https://raw.githubusercontent.com/cortexproject/cortex/master/schemas/cortex-config-schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "definitions": { + "ActiveSeriesTrackerConfig": { + "properties": { + "matchers": { + "description": "PromQL series selector (e.g. {__name__=~\"api_.*\"}). All matchers must match for a series to be counted.", + "type": "string" + }, + "name": { + "description": "Name of the tracker, used as a label value in the emitted metric.", + "type": "string" + } + }, + "type": "object" + }, "DisabledRuleGroup": { "properties": { "name": { @@ -4950,6 +4963,14 @@ "type": "boolean", "x-cli-flag": "experimental.distributor.ha-tracker.mixed-ha-samples" }, + "active_series_trackers": { + "default": [], + "description": "List of active series tracker configurations. Each tracker counts active series matching its matchers and exposes the count as a metric.", + "items": { + "type": "string" + }, + "type": "array" + }, "alertmanager_max_alerts_count": { "default": 0, "description": "Maximum number of alerts that a single user can have. Inserting more alerts will fail with a log message and metric increment. 0 = no limit.",