Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391
* [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392
* [ENHANCEMENT] Upgrade gRPC from v1.71.2 to v1.79.3 to address CVE-2026-33186. #7460
* [ENHANCEMENT] Distributor: Add HMAC-SHA256 stream authentication for `PushStream` via `-distributor.sign-write-requests-keys`. #7475
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3213,6 +3213,17 @@ ha_tracker:
# CLI flag: -distributor.sign-write-requests
[sign_write_requests: <boolean> | default = false]

# EXPERIMENTAL: Comma-separated list of HMAC-SHA256 keys authenticating
# PushStream connections between distributors and ingesters. The first key is
# used by the distributor to sign; all keys are accepted by the ingester. It
# only takes effect when the -distributor.sign-write-requests is true. The key
# change procedure for zero downtime is: (1) redeploy ingesters first with
# 'newkey,oldkey' — ingester accepts both keys; (2) redeploy distributors with
# 'newkey,oldkey' — distributor signs with newkey; (3) once stable, redeploy
# both with 'newkey' to drop the old key.
# CLI flag: -distributor.sign-write-requests-keys
[sign_write_requests_keys: <string> | default = ""]

# EXPERIMENTAL: If enabled, distributor would use stream connection to send
# requests to ingesters.
# CLI flag: -distributor.use-stream-push
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ Currently experimental features are:
- `-store-gateway.query-protection.rejection`
- Distributor/Ingester: Stream push connection
- Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor.
- Enable stream push authentication on Distributor/Ingester. (`-distributor.sign-write-requests-keys`)
- Add `__type__` and `__unit__` labels to OTLP and remote write v2 requests (`-distributor.enable-type-and-unit-labels`)
- Handle StartTimestampMs (ST) for remote write v2 samples and histograms, using CreatedTimestamp (CT) as a fallback when ST is not set (`-distributor.enable-start-timestamp`)
- Ingester: Series Queried Metric
Expand Down
95 changes: 95 additions & 0 deletions integration/ingester_stream_push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,101 @@ func TestIngesterStreamPushConnection(t *testing.T) {
assertServiceMetricsPrefixes(t, Ingester, ingester3)
}

func TestIngesterStreamPushConnectionWithMatchingSigningKey(t *testing.T) {
const signingKey = "shared-secret-for-integration-test"

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.use-stream-push"] = "true"
flags["-distributor.replication-factor"] = "1"
flags["-distributor.sign-write-requests"] = "true"
flags["-distributor.sign-write-requests-keys"] = signingKey
flags["-ingester.heartbeat-period"] = "1s"
flags["-distributor.ring.heartbeat-period"] = "1s"

consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1))

require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")),
e2e.WaitMissingMetrics))

now := time.Now()
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

// Push a few series; all should succeed because the signing key matches.
for i := 0; i < 5; i++ {
series, _ := generateSeries(fmt.Sprintf("test_signing_ok_%d", i), now)
res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode,
"push must succeed when distributor and ingester share the same signing key")
}
}

func TestIngesterStreamPushConnectionWithMismatchedSigningKey(t *testing.T) {
const distributorKey = "distributor-key"
const ingesterKey = "ingester-key-different"

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Distributor signs with distributorKey.
distributorFlags := BlocksStorageFlags()
distributorFlags["-distributor.use-stream-push"] = "true"
distributorFlags["-distributor.replication-factor"] = "1"
distributorFlags["-distributor.sign-write-requests"] = "true"
distributorFlags["-distributor.sign-write-requests-keys"] = distributorKey
distributorFlags["-ingester.heartbeat-period"] = "1s"

// Ingester verifies with ingesterKey (intentionally different).
ingesterFlags := BlocksStorageFlags()
ingesterFlags["-distributor.use-stream-push"] = "true"
ingesterFlags["-distributor.replication-factor"] = "1"
ingesterFlags["-distributor.sign-write-requests"] = "true"
ingesterFlags["-distributor.sign-write-requests-keys"] = ingesterKey
ingesterFlags["-ingester.heartbeat-period"] = "1s"

consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, distributorFlags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), distributorFlags, "")
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), ingesterFlags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester1))

require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")),
e2e.WaitMissingMetrics))

now := time.Now()
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

for i := 0; i < 3; i++ {
series, _ := generateSeries(fmt.Sprintf("test_signing_mismatch_%d", i), now)
res, err := client.Push(series)
if err == nil {
require.NotEqual(t, 200, res.StatusCode,
"push must fail when distributor and ingester use different signing keys")
}
}
}

func TestIngesterStreamPushConnectionWithError(t *testing.T) {

s, err := e2e.NewScenario(networkName)
Expand Down
7 changes: 7 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,13 @@ func (t *Cortex) setupRequestSigning() {
if t.Cfg.Distributor.SignWriteRequestsEnabled {
util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled")
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor)

// When signing keys are configured, authenticate PushStream connections.
// All keys in the list are accepted by the server; the first key is used by
// the client to sign. Multiple keys enable zero-downtime key rotation.
if keys := t.Cfg.Distributor.SignWriteRequestsKeys.Value(); len(keys) > 0 {
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcclient.NewStreamSigningServerInterceptor(keys...))
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.NameValidationScheme = t.Cfg.NameValidationScheme
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled
// The client signs with the first key in the list; additional keys are only used on the
// server side (ingester) for accepting signatures during key rotation.
if keys := t.Cfg.Distributor.SignWriteRequestsKeys.Value(); len(keys) > 0 {
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsKey = keys[0]
}

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down
17 changes: 10 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/labelset"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand Down Expand Up @@ -153,13 +154,14 @@ type Config struct {
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
UseStreamPush bool `yaml:"use_stream_push"`
RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"`
AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
SignWriteRequestsKeys flagext.SecretStringSliceCSV `yaml:"sign_write_requests_keys"`
UseStreamPush bool `yaml:"use_stream_push"`
RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"`
AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring"`
Expand Down Expand Up @@ -217,6 +219,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.Var(&cfg.SignWriteRequestsKeys, "distributor.sign-write-requests-keys", "EXPERIMENTAL: Comma-separated list of HMAC-SHA256 keys authenticating PushStream connections between distributors and ingesters. The first key is used by the distributor to sign; all keys are accepted by the ingester. It only takes effect when the -distributor.sign-write-requests is true. The key change procedure for zero downtime is: (1) redeploy ingesters first with 'newkey,oldkey' — ingester accepts both keys; (2) redeploy distributors with 'newkey,oldkey' — distributor signs with newkey; (3) once stable, redeploy both with 'newkey' to drop the old key.")
f.BoolVar(&cfg.UseStreamPush, "distributor.use-stream-push", false, "EXPERIMENTAL: If enabled, distributor would use stream connection to send requests to ingesters.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"sync"

"github.com/go-kit/log"
Expand Down Expand Up @@ -205,8 +206,10 @@ func (c *closableHealthAndIngesterClient) Run(streamPushChan chan *streamWriteJo

var workerErr error
var wg sync.WaitGroup
// Sanitize addr: colons (from host:port) are not allowed in tenant IDs.
sanitizedAddr := strings.ReplaceAll(c.addr, ":", "-")
for i := range INGESTER_CLIENT_STREAM_WORKER_COUNT {
workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", c.addr, i)
workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", sanitizedAddr, i)
wg.Go(func() {
workerCtx := user.InjectOrgID(streamCtx, workerName)
err := c.worker(workerCtx)
Expand Down
28 changes: 28 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"path/filepath"
"regexp"
"runtime"
"runtime/pprof"
"slices"
Expand Down Expand Up @@ -110,6 +111,8 @@ var (
errLabelsOutOfOrder = errors.New("labels out of order")

tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk]

distributorWorkerOrgIDRe = regexp.MustCompile(`^ingester-.+-stream-push-worker-\d+$`)
)

// Config for an Ingester.
Expand Down Expand Up @@ -1724,6 +1727,16 @@ func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error {
if err != nil {
return err
}

if contextOrgID, extractErr := users.TenantID(ctx); extractErr == nil {
if !isDistributorWorkerOrgID(contextOrgID) && contextOrgID != req.TenantID {
req.Free()
return status.Errorf(codes.PermissionDenied,
"tenant ID mismatch: stream authenticated as %q but request specifies %q",
contextOrgID, req.TenantID)
}
}

pushCtx := user.InjectOrgID(ctx, req.TenantID)
resp, err := i.Push(pushCtx, req.Request)
if resp == nil {
Expand All @@ -1747,6 +1760,21 @@ func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error {
}
}

// isDistributorWorkerOrgID reports whether orgID matches the synthetic worker-name pattern
// that the distributor injects as X-Scope-OrgID when opening a long-lived PushStream:
//
// "ingester-<addr>-stream-push-worker-<N>"
//
// When this pattern is detected, PushStream bypasses the orgID == req.TenantID check and
// instead trusts req.TenantID from the payload.
//
// Note: trusting this pattern alone is not sufficient — an attacker who knows the
// pattern can spoof it and write to any tenant. The stream-level gRPC interceptor
// enabled via -distributor.sign-write-requests-keys provides cryptographic proof.
func isDistributorWorkerOrgID(orgID string) bool {
return distributorWorkerOrgIDRe.MatchString(orgID)
}

func (u *userTSDB) acquireReadLock() error {
u.stateMtx.RLock()
defer u.stateMtx.RUnlock()
Expand Down
Loading
Loading