Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,28 @@ func runReceive(
return errors.Wrap(err, "creating limiter")
}

// Create tenant attributor if config is provided.
var tenantAttributor *receive.TenantAttributor
if conf.tenantRulesConfig != nil {
tenantRulesContent, err := conf.tenantRulesConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of tenant rules configuration")
}
if len(tenantRulesContent) > 0 {
tenantAttributor, err = receive.NewTenantAttributorFromContent(
tenantRulesContent,
conf.defaultTenantID,
conf.verifyTenantAttribution,
reg,
log.With(logger, "component", "tenant-attributor"),
)
if err != nil {
return errors.Wrap(err, "creating tenant attributor")
}
level.Info(logger).Log("msg", "tenant attribution enabled", "verify_mode", conf.verifyTenantAttribution)
}
}

webHandler := receive.NewHandler(log.With(logger, "component", "receive-handler"), &receive.Options{
Writer: writer,
ListenAddress: conf.rwAddress,
Expand All @@ -322,6 +344,7 @@ func runReceive(
Limiter: limiter,
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
TenantAttributor: tenantAttributor,
})

{
Expand Down Expand Up @@ -1007,6 +1030,9 @@ type receiveConfig struct {
noUploadTenants *[]string

compactionDelayInterval *model.Duration

tenantRulesConfig *extflag.PathOrContent
verifyTenantAttribution bool
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -1203,6 +1229,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.noUploadTenants = cmd.Flag("receive.no-upload-tenants", "Tenant IDs/patterns that should only store data locally (no object store upload). Supports exact matches (e.g., 'tenant1') and prefix patterns (e.g., 'prod-*'). Repeat this flag to specify multiple patterns.").Strings()
cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses)

rc.tenantRulesConfig = extflag.RegisterPathOrContent(cmd, "receive.tenant-rules", "YAML file that contains tenant attribution rules. Each rule maps label filters to a tenant ID. Rules are evaluated in order, first match wins.", extflag.WithEnvSubstitution())
cmd.Flag("receive.verify-tenant-attribution", "When enabled, tenant attribution rules are evaluated but only for verification. The HTTP header tenant is still used for actual routing/storage. Metrics are emitted to compare attributed vs HTTP tenant.").
Default("false").BoolVar(&rc.verifyTenantAttribution)
}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
56 changes: 49 additions & 7 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ type Options struct {
Limiter *Limiter
AsyncForwardWorkerCount uint
ReplicationProtocol ReplicationProtocol
TenantAttributor *TenantAttributor
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand Down Expand Up @@ -511,8 +512,54 @@ func (h *Handler) getStats(r *http.Request, statsByLabelName string) ([]statusap
return h.options.TSDBStats.TenantStats(statsLimit, statsByLabelName, tenantID), nil
}

// tenantKeyForDistribution matches distributeTimeseriesToReplicas semantics exactly.
// getTenantForStorage returns the tenant to use when writing a time series to TSDB.
// This is the actual tenant ID used for storage, unlike tenantKeyForDistribution which
// may include prefixes for hashring distribution purposes.
func (h *Handler) getTenantForStorage(tenantHTTP string, ts prompb.TimeSeries) string {
// If TenantAttributor is configured, the logic is identical to tenantKeyForDistribution.
if h.options.TenantAttributor != nil {
return h.tenantKeyForDistribution(tenantHTTP, ts)
}

// Legacy behavior: use splitTenantLabelName if configured.
tenant := tenantHTTP
if h.splitTenantLabelName != "" {
if tnt, ok := zlabelsGet(ts.Labels, h.splitTenantLabelName); ok && tnt != "" {
tenant = tnt
}
}
return tenant
}

// tenantKeyForDistribution returns the key to use for hashring distribution.
// This may differ from the storage tenant (e.g., by including label name prefix).
//
// Behavior:
// - If verify-attribution is true, use the HTTP tenant for actual routing.
// - If verify-attribution is false and HTTP header is provided (tenant != default), use the HTTP tenant.
// - If verify-attribution is false and HTTP header is not provided, do attribution from labels.
func (h *Handler) tenantKeyForDistribution(tenantHTTP string, ts prompb.TimeSeries) string {
// If TenantAttributor is configured, use rule-based tenant attribution.
if h.options.TenantAttributor != nil {
// In verify mode: attribution for metrics only, use HTTP tenant for actual routing.
if h.options.TenantAttributor.IsVerifyMode() {
lbls := labelpb.ZLabelsToPromLabels(ts.Labels)
attributedTenant := h.options.TenantAttributor.GetTenantFromLabels(lbls)
h.options.TenantAttributor.RecordVerification(attributedTenant, tenantHTTP)
return tenantHTTP
}

// Non-verify mode: if HTTP header was provided (tenant != default), use it.
if tenantHTTP != h.options.DefaultTenantID {
return tenantHTTP
}

// No HTTP header provided: do attribution from labels.
lbls := labelpb.ZLabelsToPromLabels(ts.Labels)
return h.options.TenantAttributor.GetTenantFromLabels(lbls)
}

// Legacy behavior: use splitTenantLabelName if configured.
tenant := tenantHTTP
if h.splitTenantLabelName == "" {
return tenant
Expand Down Expand Up @@ -1140,12 +1187,7 @@ func (h *Handler) sendLocalWrite(

tenantSeriesMapping := map[string][]prompb.TimeSeries{}
for _, ts := range trackedSeries.timeSeries {
var tenant = tenantHTTP
if h.splitTenantLabelName != "" {
if tnt, ok := zlabelsGet(ts.Labels, h.splitTenantLabelName); ok && tnt != "" {
tenant = tnt
}
}
tenant := h.getTenantForStorage(tenantHTTP, ts)
tenantSeriesMapping[tenant] = append(tenantSeriesMapping[tenant], ts)
}

Expand Down
31 changes: 21 additions & 10 deletions pkg/receive/tenant_attribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,27 @@ func NewTenantAttributor(
verifyMode bool,
reg prometheus.Registerer,
logger log.Logger,
) (*TenantAttributor, error) {
// Load and parse config file
if configPath == "" {
return nil, errors.New("tenant rules config path is required")
}

configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("reading tenant rules config: %w", err)
}

return NewTenantAttributorFromContent(configData, defaultTenant, verifyMode, reg, logger)
}

// NewTenantAttributorFromContent creates a new TenantAttributor from YAML content.
func NewTenantAttributorFromContent(
configData []byte,
defaultTenant string,
verifyMode bool,
reg prometheus.Registerer,
logger log.Logger,
) (*TenantAttributor, error) {
ta := &TenantAttributor{
defaultTenant: defaultTenant,
Expand All @@ -73,16 +94,6 @@ func NewTenantAttributor(
})
}

// Load and parse config file
if configPath == "" {
return nil, errors.New("tenant rules config path is required")
}

configData, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("reading tenant rules config: %w", err)
}

var ruleConfigs []TenantRuleConfig
if err := yaml.UnmarshalStrict(configData, &ruleConfigs); err != nil {
return nil, fmt.Errorf("parsing tenant rules config: %w", err)
Expand Down
Loading