diff --git a/CHANGELOG.md b/CHANGELOG.md index c4505a58558..d081d9ca6fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446 +* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2091fb52ff9..c521bccfcc6 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4153,6 +4153,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # [max_series] [limits_per_label_set: | default = []] +# List of active series tracker configurations. Each tracker counts active +# series matching its matchers and exposes the count as a metric. +[active_series_trackers: | default = []] + # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] @@ -6864,6 +6868,17 @@ limits: [label_set: | default = []] ``` +### `ActiveSeriesTrackerConfig` + +```yaml +# Name of the tracker, used as a label value in the emitted metric. +[name: | default = ""] + +# PromQL series selector (e.g. {__name__=~"api_.*"}). All matchers must match +# for a series to be counted. +[matchers: | default = ""] +``` + ### `PriorityDef` ```yaml diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index be3ee78ce59..092d0d18f9b 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -129,3 +129,6 @@ Currently experimental features are: - `-validation.max-label-cardinality-for-unoptimized-regex` (int) - maximum label cardinality - `-validation.max-total-label-value-length-for-unoptimized-regex` (int) - maximum total length of all label values in bytes - HATracker: `-distributor.ha-tracker.enable-startup-sync` (bool) - If enabled, fetches all tracked keys on startup to populate the local cache. +- Ingester: Active Series Tracker + - Per-tenant `active_series_trackers` configuration in runtime config overrides + - Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric diff --git a/docs/proposals/active-series-tracker.md b/docs/proposals/active-series-tracker.md new file mode 100644 index 00000000000..b549ee5b336 --- /dev/null +++ b/docs/proposals/active-series-tracker.md @@ -0,0 +1,71 @@ +# Active Series Tracker + +## Problem + +AMP needs to monitor active series counts by configurable patterns (e.g., all series with `__name__=~"api_.*"`) for internal observability. The existing `LimitsPerLabelSet` feature is unsuitable because: + +1. **No regex matching** — only supports exact `label=value` matching. +2. **Default partition side-effects** — adding labelset buckets reduces the default partition count. +3. **Coupled to limit enforcement** — designed for enforcing series limits, not pure monitoring. + +## Requirements + +- Track active series counts by configurable label matchers (including regex). +- Expose counts as Prometheus metrics on the ingester (internal only, not vended to customers). +- Configuration supports **per-tenant overrides** with a **default** fallback (same pattern as all other Limits fields). +- **Runtime hot-reloadable** via the existing runtime config file mechanism. +- **No limit enforcement** — purely observational. +- **No default partition** — unmatched series are simply not tracked. +- A series can match multiple tracker entries simultaneously. + +## Design + +### Configuration + +Tracker config lives in the `Limits` struct, following the same per-tenant override pattern as `LimitsPerLabelSet`: + +```yaml +# Default trackers (applied to all tenants without overrides) +limits: + active_series_trackers: + - name: api_metrics + matchers: '{__name__=~"api_.*"}' + +# Per-tenant overrides via runtime config +overrides: + tenant-123: + active_series_trackers: + - name: api_metrics + matchers: '{__name__=~"api_.*"}' + - name: system_metrics + matchers: '{__name__=~"node_.*|process_.*"}' +``` + +The `matchers` field uses standard PromQL matcher syntax parsed via `parser.ParseMetricSelector`. + +### Runtime Reload + +Tracker config is part of `Limits`, which is reloaded via the runtime config manager every `runtime-config.reload-period` (default 10s). Matchers are parsed and validated during YAML/JSON unmarshalling. Invalid configs are rejected (existing config stays active). + +### Metrics + +A new gauge metric emitted per ingester: + +``` +cortex_ingester_active_series_per_tracker{user="", name=""} +``` + +### Matching Logic + +On each active series metrics update tick (default 1min), for each tenant: +1. Read the tenant's tracker config via `i.limits.ActiveSeriesTrackers(userID)` +2. For each tracker, count active series whose labels satisfy all matchers +3. Emit the gauge metric + +A series can match multiple trackers. Tenants without configured trackers emit no tracker metrics. + +### Performance Considerations + +- Matching runs once per update period (default 1min), not on every sample ingestion. +- The number of trackers is expected to be small (< 10). +- Compiled matchers are cached in the parsed Limits and only recompiled on config change. diff --git a/integration/active_series_tracker_test.go b/integration/active_series_tracker_test.go new file mode 100644 index 00000000000..90689cf2178 --- /dev/null +++ b/integration/active_series_tracker_test.go @@ -0,0 +1,160 @@ +//go:build requires_docker + +package integration + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestActiveSeriesTrackerPerTenant(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Write runtime config with per-tenant active series trackers. + runtimeConfig := map[string]interface{}{ + "overrides": map[string]interface{}{ + "user-1": map[string]interface{}{ + "active_series_trackers": []map[string]string{ + {"name": "api_metrics", "matchers": `{__name__=~"api_.*"}`}, + {"name": "node_metrics", "matchers": `{__name__=~"node_.*"}`}, + }, + }, + }, + } + runtimeCfgYAML, err := yaml.Marshal(runtimeConfig) + require.NoError(t, err) + require.NoError(t, writeFileToSharedDir(s, runtimeConfigFile, runtimeCfgYAML)) + + flags := BlocksStorageFlags() + flags["-distributor.shard-by-all-labels"] = "true" + flags["-ingester.active-series-metrics-enabled"] = "true" + flags["-ingester.active-series-metrics-update-period"] = "2s" + flags["-ingester.active-series-metrics-idle-timeout"] = "5m" + flags["-runtime-config.file"] = filepath.Join(e2e.ContainerSharedDir, runtimeConfigFile) + flags["-runtime-config.reload-period"] = "1s" + flags["-alertmanager.web.external-url"] = "http://localhost/alertmanager" + flags["-alertmanager-storage.backend"] = "local" + flags["-alertmanager-storage.local.path"] = filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs") + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + flags["-ring.store"] = "consul" + flags["-consul.hostname"] = consul.NetworkHTTPEndpoint() + + cortex := e2ecortex.NewSingleBinary("cortex-1", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until the ring is ready. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + now := time.Now() + for _, name := range []string{"api_requests_total", "api_errors_total", "node_cpu_seconds", "process_memory_bytes"} { + series, _ := generateSeries(name, now, prompb.Label{Name: "job", Value: "test"}) + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode, fmt.Sprintf("push %s failed", name)) + } + + // user-1 has trackers: api_metrics (matches 2), node_metrics (matches 1). + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(2), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "api_metrics"), + ), + e2e.WaitMissingMetrics, + )) + + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "node_metrics"), + ), + e2e.WaitMissingMetrics, + )) + + // user-2 has no trackers configured — should have no tracker metrics. + c2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-2") + require.NoError(t, err) + + series2, _ := generateSeries("api_requests_total", now, prompb.Label{Name: "job", Value: "test"}) + res, err := c2.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait for user-2 active series to be counted. + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(1), + []string{"cortex_ingester_active_series"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-2")), + e2e.WaitMissingMetrics, + )) + + // user-2 should have no tracker metrics. + sum, err := cortex.SumMetrics( + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-2")), + e2e.SkipMissingMetrics, + ) + require.NoError(t, err) + require.Equal(t, 0.0, sum[0]) + + // Now update runtime config: remove node_metrics tracker for user-1. + runtimeConfig2 := map[string]interface{}{ + "overrides": map[string]interface{}{ + "user-1": map[string]interface{}{ + "active_series_trackers": []map[string]string{ + {"name": "api_metrics", "matchers": `{__name__=~"api_.*"}`}, + }, + }, + }, + } + runtimeCfgYAML2, err := yaml.Marshal(runtimeConfig2) + require.NoError(t, err) + require.NoError(t, writeFileToSharedDir(s, runtimeConfigFile, runtimeCfgYAML2)) + + // Wait for the stale node_metrics tracker metric to be removed. + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(0), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "node_metrics"), + ), + e2e.SkipMissingMetrics, + )) + + // api_metrics tracker should still work. + require.NoError(t, cortex.WaitSumMetricsWithOptions( + e2e.Equals(2), + []string{"cortex_ingester_active_series_per_tracker"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"), + labels.MustNewMatcher(labels.MatchEqual, "name", "api_metrics"), + ), + e2e.WaitMissingMetrics, + )) +} diff --git a/pkg/ingester/active_series.go b/pkg/ingester/active_series.go index 57134a03ca6..1a685a36cfa 100644 --- a/pkg/ingester/active_series.go +++ b/pkg/ingester/active_series.go @@ -248,3 +248,13 @@ func (s *activeSeriesStripe) getActiveNativeHistogram() int { return s.activeNativeHistogram } + +// matchesAll returns true if the labels satisfy all given matchers. +func matchesAll(lbs labels.Labels, matchers []*labels.Matcher) bool { + for _, m := range matchers { + if !m.Matches(lbs.Get(m.Name)) { + return false + } + } + return true +} diff --git a/pkg/ingester/active_series_tracker_test.go b/pkg/ingester/active_series_tracker_test.go new file mode 100644 index 00000000000..adcd2a03309 --- /dev/null +++ b/pkg/ingester/active_series_tracker_test.go @@ -0,0 +1,206 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func TestActiveSeriesTrackerConfig_Validate(t *testing.T) { + tests := []struct { + name string + cfg validation.ActiveSeriesTrackerConfig + expectErr bool + }{ + {name: "valid exact", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__="api_requests_total"}`}}, + {name: "valid regex", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"api_.*"}`}}, + {name: "valid multiple", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"api_.*", job="gateway"}`}}, + {name: "invalid regex", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"[invalid"}`}, expectErr: true}, + {name: "empty matchers", cfg: validation.ActiveSeriesTrackerConfig{Name: "test", Matchers: ``}, expectErr: true}, + {name: "empty name", cfg: validation.ActiveSeriesTrackerConfig{Name: "", Matchers: `{__name__="foo"}`}, expectErr: true}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + if tc.expectErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.NotEmpty(t, tc.cfg.ParsedMatchers()) + } + }) + } +} + +func TestActiveSeriesTrackersConfig_Validate(t *testing.T) { + t.Run("valid", func(t *testing.T) { + cfg := validation.ActiveSeriesTrackersConfig{ + {Name: "a", Matchers: `{__name__="foo"}`}, + {Name: "b", Matchers: `{__name__=~"bar.*"}`}, + } + require.NoError(t, cfg.Validate()) + }) + + t.Run("duplicate names", func(t *testing.T) { + cfg := validation.ActiveSeriesTrackersConfig{ + {Name: "dup", Matchers: `{__name__="foo"}`}, + {Name: "dup", Matchers: `{__name__="bar"}`}, + } + assert.Error(t, cfg.Validate()) + }) + + t.Run("nil is valid", func(t *testing.T) { + var cfg validation.ActiveSeriesTrackersConfig + require.NoError(t, cfg.Validate()) + }) +} + +func TestTrackerCounter_IncreaseDecrease(t *testing.T) { + tc := newTrackerCounter() + + trackers := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + {Name: "gateway", Matchers: `{job="gateway"}`}, + } + require.NoError(t, trackers.Validate()) + + // Simulate config load. + tc.mu.Lock() + tc.matchers = map[string][]*labels.Matcher{ + "api": trackers[0].ParsedMatchers(), + "gateway": trackers[1].ParsedMatchers(), + } + tc.mu.Unlock() + + // Add series. + tc.increase(labels.FromStrings("__name__", "api_requests_total", "job", "gateway")) + tc.increase(labels.FromStrings("__name__", "api_errors_total", "job", "gateway")) + tc.increase(labels.FromStrings("__name__", "node_cpu_seconds", "job", "node")) + + tc.mu.Lock() + assert.Equal(t, 2, tc.counts["api"]) + assert.Equal(t, 2, tc.counts["gateway"]) + tc.mu.Unlock() + + // Remove one series. + tc.decrease(labels.FromStrings("__name__", "api_requests_total", "job", "gateway")) + + tc.mu.Lock() + assert.Equal(t, 1, tc.counts["api"]) + assert.Equal(t, 1, tc.counts["gateway"]) + tc.mu.Unlock() +} + +func TestTrackerCounter_UpdateMetrics(t *testing.T) { + tc := newTrackerCounter() + gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "test"}, []string{"user", "name"}) + + trackers := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + {Name: "node", Matchers: `{__name__=~"node_.*"}`}, + } + require.NoError(t, trackers.Validate()) + + tc.mu.Lock() + tc.matchers = map[string][]*labels.Matcher{ + "api": trackers[0].ParsedMatchers(), + "node": trackers[1].ParsedMatchers(), + } + tc.counts = map[string]int{"api": 5, "node": 3} + tc.mu.Unlock() + + tc.updateMetrics(gauge, "user-1", trackers) + + m := &dto.Metric{} + require.NoError(t, gauge.WithLabelValues("user-1", "api").Write(m)) + assert.Equal(t, float64(5), m.GetGauge().GetValue()) + require.NoError(t, gauge.WithLabelValues("user-1", "node").Write(m)) + assert.Equal(t, float64(3), m.GetGauge().GetValue()) + + // Remove "node" tracker — stale metric should be cleaned up. + trackers2 := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + } + require.NoError(t, trackers2.Validate()) + tc.updateMetrics(gauge, "user-1", trackers2) + + // "node" metric deleted (re-creating gives 0). + require.NoError(t, gauge.WithLabelValues("user-1", "node").Write(m)) + assert.Equal(t, float64(0), m.GetGauge().GetValue()) + // "api" still reported. + require.NoError(t, gauge.WithLabelValues("user-1", "api").Write(m)) + assert.Equal(t, float64(5), m.GetGauge().GetValue()) +} + +func TestTrackerCounter_UpdateConfig_Backfill(t *testing.T) { + tc := newTrackerCounter() + + // Without a DB, just sets matchers. + trackers := validation.ActiveSeriesTrackersConfig{ + {Name: "api", Matchers: `{__name__=~"api_.*"}`}, + } + require.NoError(t, trackers.Validate()) + tc.updateConfig(context.Background(), nil, trackers) + + tc.mu.Lock() + assert.NotNil(t, tc.matchers["api"]) + assert.Equal(t, 0, tc.counts["api"]) + tc.mu.Unlock() +} + +func TestMatchesAll(t *testing.T) { + lbs := labels.FromStrings("__name__", "http_requests_total", "method", "GET", "status", "200") + + tests := []struct { + name string + matchers []*labels.Matcher + expected bool + }{ + { + name: "all match", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests_total"), labels.MustNewMatcher(labels.MatchEqual, "method", "GET")}, + expected: true, + }, + { + name: "one doesn't match", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "__name__", "http_requests_total"), labels.MustNewMatcher(labels.MatchEqual, "method", "POST")}, + expected: false, + }, + { + name: "regex match", + matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "__name__", "http_.*")}, + expected: true, + }, + { + name: "empty matchers matches everything", + matchers: nil, + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, matchesAll(lbs, tc.matchers)) + }) + } +} + +// Ensure ActiveSeries still works (we kept matchesAll but removed ActiveForMatchers). +func TestActiveSeries_Basic(t *testing.T) { + as := NewActiveSeries() + now := time.Now() + + s := labels.FromStrings("__name__", "test", "job", "app") + as.UpdateSeries(s, s.Hash(), now, false, func(l labels.Labels) labels.Labels { return l.Copy() }) + + assert.Equal(t, 1, as.Active()) +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cffb9ca332b..61a7f73d895 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -404,6 +404,9 @@ type userTSDB struct { blockRetentionPeriod int64 postingCache cortex_tsdb.ExpandedPostingsCache + + // Tracks active series per configured tracker pattern. + trackerCounter *trackerCounter } // Explicitly wrapping the tsdb.DB functions that we use. @@ -545,6 +548,7 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { } u.seriesInMetric.increaseSeriesForMetric(metricName) u.labelSetCounter.increaseSeriesLabelSet(u, metric) + u.trackerCounter.increase(metric) if u.postingCache != nil { u.postingCache.ExpireSeries(metric) @@ -563,6 +567,7 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) } u.seriesInMetric.decreaseSeriesForMetric(metricName) u.labelSetCounter.decreaseSeriesLabelSet(u, metric) + u.trackerCounter.decrease(metric) if u.postingCache != nil { u.postingCache.ExpireSeries(metric) } @@ -1146,6 +1151,11 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) { if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics); err != nil { level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err) } + + // Per-tenant active series trackers (hot-reloadable via runtime config overrides). + trackers := i.limits.ActiveSeriesTrackers(userID) + userDB.trackerCounter.updateConfig(ctx, userDB.db, trackers) + userDB.trackerCounter.updateMetrics(i.metrics.activeSeriesPerTracker, userID, trackers) } } @@ -2868,6 +2878,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { activeQueriedSeries: activeQueriedSeries, seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), labelSetCounter: newLabelSetCounter(i.limiter), + trackerCounter: newTrackerCounter(), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 1487696fce3..238c578e656 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -60,6 +60,7 @@ type ingesterMetrics struct { activeQueriedSeriesPerUser *prometheus.GaugeVec limitsPerLabelSet *prometheus.GaugeVec usagePerLabelSet *prometheus.GaugeVec + activeSeriesPerTracker *prometheus.GaugeVec // Global limit metrics maxUsersGauge prometheus.GaugeFunc @@ -297,6 +298,12 @@ func newIngesterMetrics(r prometheus.Registerer, Help: "Number of currently active native histogram series per user.", }, []string{"user"}), + // Not registered automatically, but only if activeSeriesEnabled is true. + activeSeriesPerTracker: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_series_per_tracker", + Help: "Number of currently active series matching a configured tracker pattern.", + }, []string{"user", "name"}), + // Not registered automatically, but only if activeQueriedSeriesEnabled is true. activeQueriedSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_ingester_active_queried_series", @@ -342,6 +349,7 @@ func newIngesterMetrics(r prometheus.Registerer, if activeSeriesEnabled && r != nil { r.MustRegister(m.activeSeriesPerUser) r.MustRegister(m.activeNHSeriesPerUser) + r.MustRegister(m.activeSeriesPerTracker) } if activeQueriedSeriesEnabled && r != nil { @@ -372,6 +380,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.memMetadataRemovedTotal.DeleteLabelValues(userID) m.activeSeriesPerUser.DeleteLabelValues(userID) m.activeNHSeriesPerUser.DeleteLabelValues(userID) + m.activeSeriesPerTracker.DeletePartialMatch(prometheus.Labels{"user": userID}) m.activeQueriedSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) diff --git a/pkg/ingester/tracker_counter.go b/pkg/ingester/tracker_counter.go new file mode 100644 index 00000000000..396a3f9a7e5 --- /dev/null +++ b/pkg/ingester/tracker_counter.go @@ -0,0 +1,133 @@ +package ingester + +import ( + "context" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// trackerCounter counts active series per configured tracker pattern. +// It increments on series creation and decrements on series deletion, +// similar to labelSetCounter but using PromQL matchers. +type trackerCounter struct { + mu sync.Mutex + counts map[string]int // tracker name -> count + matchers map[string][]*labels.Matcher // tracker name -> compiled matchers +} + +func newTrackerCounter() *trackerCounter { + return &trackerCounter{ + counts: make(map[string]int), + matchers: make(map[string][]*labels.Matcher), + } +} + +// updateConfig applies new tracker configuration. If trackers changed, backfills +// counts from the TSDB head index. +func (tc *trackerCounter) updateConfig(ctx context.Context, db *tsdb.DB, trackers validation.ActiveSeriesTrackersConfig) { + tc.mu.Lock() + defer tc.mu.Unlock() + + newMatchers := make(map[string][]*labels.Matcher, len(trackers)) + for i := range trackers { + newMatchers[trackers[i].Name] = trackers[i].ParsedMatchers() + } + + if trackerMatchersEqual(tc.matchers, newMatchers) { + return + } + + // Config changed — backfill counts from TSDB head. + tc.matchers = newMatchers + tc.counts = make(map[string]int, len(newMatchers)) + + if db == nil || len(newMatchers) == 0 { + return + } + + ir, err := db.Head().Index() + if err != nil { + return + } + defer ir.Close() + + // Use PostingsForMatchers per tracker for efficient index-based counting. + for name, m := range tc.matchers { + p, err := tsdb.PostingsForMatchers(ctx, ir, m...) + if err != nil { + continue + } + count := 0 + for p.Next() { + count++ + } + tc.counts[name] = count + } +} + +// increase is called on PostCreation for a new series. +func (tc *trackerCounter) increase(metric labels.Labels) { + tc.mu.Lock() + defer tc.mu.Unlock() + + for name, m := range tc.matchers { + if matchesAll(metric, m) { + tc.counts[name]++ + } + } +} + +// decrease is called on PostDeletion for a removed series. +func (tc *trackerCounter) decrease(metric labels.Labels) { + tc.mu.Lock() + defer tc.mu.Unlock() + + for name, m := range tc.matchers { + if matchesAll(metric, m) { + tc.counts[name]-- + } + } +} + +// updateMetrics reports current counts and cleans up stale tracker metrics. +func (tc *trackerCounter) updateMetrics(gauge *prometheus.GaugeVec, userID string, trackers validation.ActiveSeriesTrackersConfig) { + tc.mu.Lock() + defer tc.mu.Unlock() + + activeNames := make(map[string]struct{}, len(trackers)) + for _, t := range trackers { + gauge.WithLabelValues(userID, t.Name).Set(float64(tc.counts[t.Name])) + activeNames[t.Name] = struct{}{} + } + + // Delete metrics for trackers no longer in config. + for name := range tc.counts { + if _, ok := activeNames[name]; !ok { + gauge.DeleteLabelValues(userID, name) + delete(tc.counts, name) + } + } +} + +func trackerMatchersEqual(a, b map[string][]*labels.Matcher) bool { + if len(a) != len(b) { + return false + } + for name, am := range a { + bm, ok := b[name] + if !ok || len(am) != len(bm) { + return false + } + for i := range am { + if am[i].String() != bm[i].String() { + return false + } + } + } + return true +} diff --git a/pkg/util/validation/active_series_tracker.go b/pkg/util/validation/active_series_tracker.go new file mode 100644 index 00000000000..4a95b790671 --- /dev/null +++ b/pkg/util/validation/active_series_tracker.go @@ -0,0 +1,61 @@ +package validation + +import ( + "errors" + "fmt" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" +) + +var ( + errActiveSeriesTrackerEmptyName = errors.New("active series tracker name must not be empty") + errActiveSeriesTrackerDuplicateName = errors.New("duplicate active series tracker name") +) + +// ActiveSeriesTrackerConfig defines a single tracker entry that counts active series +// matching a set of label matchers. This is used for internal monitoring only — +// no limits are enforced. +type ActiveSeriesTrackerConfig struct { + Name string `yaml:"name" json:"name" doc:"nocli|description=Name of the tracker, used as a label value in the emitted metric."` + Matchers string `yaml:"matchers" json:"matchers" doc:"nocli|description=PromQL series selector (e.g. {__name__=~\"api_.*\"}). All matchers must match for a series to be counted."` + + // Parsed matchers, populated during validation. + parsedMatchers []*labels.Matcher `yaml:"-" json:"-" doc:"nocli"` +} + +// ParsedMatchers returns the compiled matchers. Must call Validate() first. +func (c *ActiveSeriesTrackerConfig) ParsedMatchers() []*labels.Matcher { + return c.parsedMatchers +} + +// Validate parses the matchers string into compiled label matchers. +func (c *ActiveSeriesTrackerConfig) Validate() error { + if c.Name == "" { + return errActiveSeriesTrackerEmptyName + } + matchers, err := parser.ParseMetricSelector(c.Matchers) + if err != nil { + return fmt.Errorf("active series tracker %q: %w", c.Name, err) + } + c.parsedMatchers = matchers + return nil +} + +// ActiveSeriesTrackersConfig is a list of tracker configurations. +type ActiveSeriesTrackersConfig []ActiveSeriesTrackerConfig + +// Validate parses and validates all tracker entries, ensuring names are unique and non-empty. +func (c ActiveSeriesTrackersConfig) Validate() error { + names := make(map[string]struct{}, len(c)) + for i := range c { + if err := c[i].Validate(); err != nil { + return err + } + if _, exists := names[c[i].Name]; exists { + return fmt.Errorf("%w: %q", errActiveSeriesTrackerDuplicateName, c[i].Name) + } + names[c[i].Name] = struct{}{} + } + return nil +} diff --git a/pkg/util/validation/active_series_tracker_test.go b/pkg/util/validation/active_series_tracker_test.go new file mode 100644 index 00000000000..818b51a62d8 --- /dev/null +++ b/pkg/util/validation/active_series_tracker_test.go @@ -0,0 +1,112 @@ +package validation + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestActiveSeriesTrackerConfig_Validate(t *testing.T) { + t.Run("valid regex", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"api_.*"}`} + require.NoError(t, cfg.Validate()) + assert.Len(t, cfg.ParsedMatchers(), 1) + }) + + t.Run("invalid regex", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "test", Matchers: `{__name__=~"[bad"}`} + assert.Error(t, cfg.Validate()) + }) + + t.Run("empty matchers", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "test", Matchers: ``} + assert.Error(t, cfg.Validate()) + }) + + t.Run("empty name", func(t *testing.T) { + cfg := ActiveSeriesTrackerConfig{Name: "", Matchers: `{__name__="foo"}`} + assert.ErrorIs(t, cfg.Validate(), errActiveSeriesTrackerEmptyName) + }) +} + +func TestActiveSeriesTrackersConfig_Validate(t *testing.T) { + t.Run("all valid", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "a", Matchers: `{__name__="foo"}`}, + {Name: "b", Matchers: `{__name__=~"bar.*"}`}, + } + require.NoError(t, cfg.Validate()) + }) + + t.Run("one invalid fails all", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "a", Matchers: `{__name__="foo"}`}, + {Name: "bad", Matchers: `{__name__=~"[bad"}`}, + } + assert.Error(t, cfg.Validate()) + }) + + t.Run("nil is valid", func(t *testing.T) { + var cfg ActiveSeriesTrackersConfig + require.NoError(t, cfg.Validate()) + }) + + t.Run("duplicate names", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "dup", Matchers: `{__name__="foo"}`}, + {Name: "dup", Matchers: `{__name__="bar"}`}, + } + assert.ErrorIs(t, cfg.Validate(), errActiveSeriesTrackerDuplicateName) + }) + + t.Run("empty name in list", func(t *testing.T) { + cfg := ActiveSeriesTrackersConfig{ + {Name: "", Matchers: `{__name__="foo"}`}, + } + assert.ErrorIs(t, cfg.Validate(), errActiveSeriesTrackerEmptyName) + }) +} + +func TestOverrides_ActiveSeriesTrackers_PerTenant(t *testing.T) { + defaults := Limits{} + defaults.ActiveSeriesTrackers = ActiveSeriesTrackersConfig{ + {Name: "default_tracker", Matchers: `{__name__=~".*"}`}, + } + require.NoError(t, defaults.ActiveSeriesTrackers.Validate()) + + SetDefaultLimitsForYAMLUnmarshalling(defaults) + + tenantTrackers := ActiveSeriesTrackersConfig{ + {Name: "tenant_api", Matchers: `{__name__=~"api_.*"}`}, + {Name: "tenant_node", Matchers: `{__name__=~"node_.*"}`}, + } + require.NoError(t, tenantTrackers.Validate()) + + tenantLimits := &Limits{} + *tenantLimits = defaults + tenantLimits.ActiveSeriesTrackers = tenantTrackers + + overrides := NewOverrides(defaults, newMockTenantLimits(map[string]*Limits{ + "tenant-with-override": tenantLimits, + })) + + // Tenant with override gets tenant-specific trackers. + trackers := overrides.ActiveSeriesTrackers("tenant-with-override") + require.Len(t, trackers, 2) + assert.Equal(t, "tenant_api", trackers[0].Name) + assert.Equal(t, "tenant_node", trackers[1].Name) + + // Tenant without override gets default trackers. + trackers = overrides.ActiveSeriesTrackers("tenant-without-override") + require.Len(t, trackers, 1) + assert.Equal(t, "default_tracker", trackers[0].Name) +} + +func TestOverrides_ActiveSeriesTrackers_NoDefaults(t *testing.T) { + defaults := Limits{} + overrides := NewOverrides(defaults, nil) + + trackers := overrides.ActiveSeriesTrackers("any-tenant") + assert.Empty(t, trackers) +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a9b92c866c5..8765f4c5c7b 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -159,14 +159,15 @@ type Limits struct { // Ingester enforced limits. // Series - MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` - MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` - MaxLocalNativeHistogramSeriesPerUser int `yaml:"max_native_histogram_series_per_user" json:"max_native_histogram_series_per_user"` - MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` - MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` - MaxGlobalNativeHistogramSeriesPerUser int `yaml:"max_global_native_histogram_series_per_user" json:"max_global_native_histogram_series_per_user"` - LimitsPerLabelSet []LimitsPerLabelSet `yaml:"limits_per_label_set" json:"limits_per_label_set" doc:"nocli|description=[Experimental] Enable limits per LabelSet. Supported limits per labelSet: [max_series]"` - EnableNativeHistograms bool `yaml:"enable_native_histograms" json:"enable_native_histograms"` + MaxLocalSeriesPerUser int `yaml:"max_series_per_user" json:"max_series_per_user"` + MaxLocalSeriesPerMetric int `yaml:"max_series_per_metric" json:"max_series_per_metric"` + MaxLocalNativeHistogramSeriesPerUser int `yaml:"max_native_histogram_series_per_user" json:"max_native_histogram_series_per_user"` + MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"` + MaxGlobalSeriesPerMetric int `yaml:"max_global_series_per_metric" json:"max_global_series_per_metric"` + MaxGlobalNativeHistogramSeriesPerUser int `yaml:"max_global_native_histogram_series_per_user" json:"max_global_native_histogram_series_per_user"` + LimitsPerLabelSet []LimitsPerLabelSet `yaml:"limits_per_label_set" json:"limits_per_label_set" doc:"nocli|description=[Experimental] Enable limits per LabelSet. Supported limits per labelSet: [max_series]"` + ActiveSeriesTrackers ActiveSeriesTrackersConfig `yaml:"active_series_trackers,omitempty" json:"active_series_trackers,omitempty" doc:"nocli|description=List of active series tracker configurations. Each tracker counts active series matching its matchers and exposes the count as a metric."` + EnableNativeHistograms bool `yaml:"enable_native_histograms" json:"enable_native_histograms"` // Regex matcher query limits. MaxRegexPatternLength int `yaml:"max_regex_pattern_length" json:"max_regex_pattern_length"` @@ -494,6 +495,10 @@ func (l *Limits) UnmarshalYAML(unmarshal func(any) error) error { return err } + if err := l.ActiveSeriesTrackers.Validate(); err != nil { + return err + } + return nil } @@ -524,6 +529,10 @@ func (l *Limits) UnmarshalJSON(data []byte) error { return err } + if err := l.ActiveSeriesTrackers.Validate(); err != nil { + return err + } + return nil } @@ -824,6 +833,11 @@ func (o *Overrides) LimitsPerLabelSet(userID string) []LimitsPerLabelSet { return o.GetOverridesForUser(userID).LimitsPerLabelSet } +// ActiveSeriesTrackers returns the active series tracker configurations for a given user. +func (o *Overrides) ActiveSeriesTrackers(userID string) ActiveSeriesTrackersConfig { + return o.GetOverridesForUser(userID).ActiveSeriesTrackers +} + // MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching // chunks from the long-term storage. func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 2521b40aca8..d21c1481659 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -2,6 +2,19 @@ "$id": "https://raw.githubusercontent.com/cortexproject/cortex/master/schemas/cortex-config-schema.json", "$schema": "https://json-schema.org/draft/2020-12/schema", "definitions": { + "ActiveSeriesTrackerConfig": { + "properties": { + "matchers": { + "description": "PromQL series selector (e.g. {__name__=~\"api_.*\"}). All matchers must match for a series to be counted.", + "type": "string" + }, + "name": { + "description": "Name of the tracker, used as a label value in the emitted metric.", + "type": "string" + } + }, + "type": "object" + }, "DisabledRuleGroup": { "properties": { "name": { @@ -4950,6 +4963,14 @@ "type": "boolean", "x-cli-flag": "experimental.distributor.ha-tracker.mixed-ha-samples" }, + "active_series_trackers": { + "default": [], + "description": "List of active series tracker configurations. Each tracker counts active series matching its matchers and exposes the count as a metric.", + "items": { + "type": "string" + }, + "type": "array" + }, "alertmanager_max_alerts_count": { "default": 0, "description": "Maximum number of alerts that a single user can have. Inserting more alerts will fail with a log message and metric increment. 0 = no limit.",