diff --git a/CHANGELOG.md b/CHANGELOG.md index ed864917403..f6c4e11f5b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [ENHANCEMENT] Compactor: Prevent partition compaction to compact any blocks marked for deletion. #7391 * [ENHANCEMENT] Distributor: Optimize memory allocations by reusing the existing capacity of these pooled slices in the Prometheus Remote Write 2.0 path. #7392 * [ENHANCEMENT] Upgrade gRPC from v1.71.2 to v1.79.3 to address CVE-2026-33186. #7460 +* [ENHANCEMENT] Distributor: Add HMAC-SHA256 stream authentication for `PushStream` via `-distributor.sign-write-requests-keys`. #7475 * [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370 * [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380 * [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2091fb52ff9..6482d452a09 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3213,6 +3213,17 @@ ha_tracker: # CLI flag: -distributor.sign-write-requests [sign_write_requests: | default = false] +# EXPERIMENTAL: Comma-separated list of HMAC-SHA256 keys authenticating +# PushStream connections between distributors and ingesters. The first key is +# used by the distributor to sign; all keys are accepted by the ingester. It +# only takes effect when the -distributor.sign-write-requests is true. The key +# change procedure for zero downtime is: (1) redeploy ingesters first with +# 'newkey,oldkey' — ingester accepts both keys; (2) redeploy distributors with +# 'newkey,oldkey' — distributor signs with newkey; (3) once stable, redeploy +# both with 'newkey' to drop the old key. +# CLI flag: -distributor.sign-write-requests-keys +[sign_write_requests_keys: | default = ""] + # EXPERIMENTAL: If enabled, distributor would use stream connection to send # requests to ingesters. # CLI flag: -distributor.use-stream-push diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index be3ee78ce59..8af991fd259 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -114,6 +114,7 @@ Currently experimental features are: - `-store-gateway.query-protection.rejection` - Distributor/Ingester: Stream push connection - Enable stream push connection between distributor and ingester by setting `-distributor.use-stream-push=true` on Distributor. + - Enable stream push authentication on Distributor/Ingester. (`-distributor.sign-write-requests-keys`) - Add `__type__` and `__unit__` labels to OTLP and remote write v2 requests (`-distributor.enable-type-and-unit-labels`) - Handle StartTimestampMs (ST) for remote write v2 samples and histograms, using CreatedTimestamp (CT) as a fallback when ST is not set (`-distributor.enable-start-timestamp`) - Ingester: Series Queried Metric diff --git a/integration/ingester_stream_push_test.go b/integration/ingester_stream_push_test.go index 4d9677cf726..db2ee088403 100644 --- a/integration/ingester_stream_push_test.go +++ b/integration/ingester_stream_push_test.go @@ -116,6 +116,101 @@ func TestIngesterStreamPushConnection(t *testing.T) { assertServiceMetricsPrefixes(t, Ingester, ingester3) } +func TestIngesterStreamPushConnectionWithMatchingSigningKey(t *testing.T) { + const signingKey = "shared-secret-for-integration-test" + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := BlocksStorageFlags() + flags["-distributor.use-stream-push"] = "true" + flags["-distributor.replication-factor"] = "1" + flags["-distributor.sign-write-requests"] = "true" + flags["-distributor.sign-write-requests-keys"] = signingKey + flags["-ingester.heartbeat-period"] = "1s" + flags["-distributor.ring.heartbeat-period"] = "1s" + + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester1)) + + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")), + e2e.WaitMissingMetrics)) + + now := time.Now() + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + // Push a few series; all should succeed because the signing key matches. + for i := 0; i < 5; i++ { + series, _ := generateSeries(fmt.Sprintf("test_signing_ok_%d", i), now) + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode, + "push must succeed when distributor and ingester share the same signing key") + } +} + +func TestIngesterStreamPushConnectionWithMismatchedSigningKey(t *testing.T) { + const distributorKey = "distributor-key" + const ingesterKey = "ingester-key-different" + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Distributor signs with distributorKey. + distributorFlags := BlocksStorageFlags() + distributorFlags["-distributor.use-stream-push"] = "true" + distributorFlags["-distributor.replication-factor"] = "1" + distributorFlags["-distributor.sign-write-requests"] = "true" + distributorFlags["-distributor.sign-write-requests-keys"] = distributorKey + distributorFlags["-ingester.heartbeat-period"] = "1s" + + // Ingester verifies with ingesterKey (intentionally different). + ingesterFlags := BlocksStorageFlags() + ingesterFlags["-distributor.use-stream-push"] = "true" + ingesterFlags["-distributor.replication-factor"] = "1" + ingesterFlags["-distributor.sign-write-requests"] = "true" + ingesterFlags["-distributor.sign-write-requests-keys"] = ingesterKey + ingesterFlags["-ingester.heartbeat-period"] = "1s" + + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, distributorFlags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), distributorFlags, "") + ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), ingesterFlags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester1)) + + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")), + e2e.WaitMissingMetrics)) + + now := time.Now() + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID) + require.NoError(t, err) + + for i := 0; i < 3; i++ { + series, _ := generateSeries(fmt.Sprintf("test_signing_mismatch_%d", i), now) + res, err := client.Push(series) + if err == nil { + require.NotEqual(t, 200, res.StatusCode, + "push must fail when distributor and ingester use different signing keys") + } + } +} + func TestIngesterStreamPushConnectionWithError(t *testing.T) { s, err := e2e.NewScenario(networkName) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 08dcd2d3fbb..c0e4d7fd384 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -423,6 +423,13 @@ func (t *Cortex) setupRequestSigning() { if t.Cfg.Distributor.SignWriteRequestsEnabled { util_log.WarnExperimentalUse("Distributor SignWriteRequestsEnabled") t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, grpcclient.UnarySigningServerInterceptor) + + // When signing keys are configured, authenticate PushStream connections. + // All keys in the list are accepted by the server; the first key is used by + // the client to sign. Multiple keys enable zero-downtime key rotation. + if keys := t.Cfg.Distributor.SignWriteRequestsKeys.Value(); len(keys) > 0 { + t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, grpcclient.NewStreamSigningServerInterceptor(keys...)) + } } } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3ccd5b2f975..9faf38e6977 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -245,6 +245,11 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) { t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort t.Cfg.Distributor.NameValidationScheme = t.Cfg.NameValidationScheme t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled + // The client signs with the first key in the list; additional keys are only used on the + // server side (ingester) for accepting signatures during key rotation. + if keys := t.Cfg.Distributor.SignWriteRequestsKeys.Value(); len(keys) > 0 { + t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsKey = keys[0] + } // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b217a09e9c5..0719b9ca536 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -37,6 +37,7 @@ import ( ring_client "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/labelset" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -153,13 +154,14 @@ type Config struct { RemoteTimeout time.Duration `yaml:"remote_timeout"` ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"` - ShardingStrategy string `yaml:"sharding_strategy"` - ShardByAllLabels bool `yaml:"shard_by_all_labels"` - ExtendWrites bool `yaml:"extend_writes"` - SignWriteRequestsEnabled bool `yaml:"sign_write_requests"` - UseStreamPush bool `yaml:"use_stream_push"` - RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"` - AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"` + ShardingStrategy string `yaml:"sharding_strategy"` + ShardByAllLabels bool `yaml:"shard_by_all_labels"` + ExtendWrites bool `yaml:"extend_writes"` + SignWriteRequestsEnabled bool `yaml:"sign_write_requests"` + SignWriteRequestsKeys flagext.SecretStringSliceCSV `yaml:"sign_write_requests_keys"` + UseStreamPush bool `yaml:"use_stream_push"` + RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"` + AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` @@ -217,6 +219,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.") f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.") + f.Var(&cfg.SignWriteRequestsKeys, "distributor.sign-write-requests-keys", "EXPERIMENTAL: Comma-separated list of HMAC-SHA256 keys authenticating PushStream connections between distributors and ingesters. The first key is used by the distributor to sign; all keys are accepted by the ingester. It only takes effect when the -distributor.sign-write-requests is true. The key change procedure for zero downtime is: (1) redeploy ingesters first with 'newkey,oldkey' — ingester accepts both keys; (2) redeploy distributors with 'newkey,oldkey' — distributor signs with newkey; (3) once stable, redeploy both with 'newkey' to drop the old key.") f.BoolVar(&cfg.UseStreamPush, "distributor.use-stream-push", false, "EXPERIMENTAL: If enabled, distributor would use stream connection to send requests to ingesters.") f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index c4fa432ce0f..1744238dfd8 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "strings" "sync" "github.com/go-kit/log" @@ -205,8 +206,10 @@ func (c *closableHealthAndIngesterClient) Run(streamPushChan chan *streamWriteJo var workerErr error var wg sync.WaitGroup + // Sanitize addr: colons (from host:port) are not allowed in tenant IDs. + sanitizedAddr := strings.ReplaceAll(c.addr, ":", "-") for i := range INGESTER_CLIENT_STREAM_WORKER_COUNT { - workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", c.addr, i) + workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", sanitizedAddr, i) wg.Go(func() { workerCtx := user.InjectOrgID(streamCtx, workerName) err := c.worker(workerCtx) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index cffb9ca332b..f34d2082da8 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path/filepath" + "regexp" "runtime" "runtime/pprof" "slices" @@ -110,6 +111,8 @@ var ( errLabelsOutOfOrder = errors.New("labels out of order") tsChunksPool zeropool.Pool[[]client.TimeSeriesChunk] + + distributorWorkerOrgIDRe = regexp.MustCompile(`^ingester-.+-stream-push-worker-\d+$`) ) // Config for an Ingester. @@ -1724,6 +1727,16 @@ func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error { if err != nil { return err } + + if contextOrgID, extractErr := users.TenantID(ctx); extractErr == nil { + if !isDistributorWorkerOrgID(contextOrgID) && contextOrgID != req.TenantID { + req.Free() + return status.Errorf(codes.PermissionDenied, + "tenant ID mismatch: stream authenticated as %q but request specifies %q", + contextOrgID, req.TenantID) + } + } + pushCtx := user.InjectOrgID(ctx, req.TenantID) resp, err := i.Push(pushCtx, req.Request) if resp == nil { @@ -1747,6 +1760,21 @@ func (i *Ingester) PushStream(srv client.Ingester_PushStreamServer) error { } } +// isDistributorWorkerOrgID reports whether orgID matches the synthetic worker-name pattern +// that the distributor injects as X-Scope-OrgID when opening a long-lived PushStream: +// +// "ingester--stream-push-worker-" +// +// When this pattern is detected, PushStream bypasses the orgID == req.TenantID check and +// instead trusts req.TenantID from the payload. +// +// Note: trusting this pattern alone is not sufficient — an attacker who knows the +// pattern can spoof it and write to any tenant. The stream-level gRPC interceptor +// enabled via -distributor.sign-write-requests-keys provides cryptographic proof. +func isDistributorWorkerOrgID(orgID string) bool { + return distributorWorkerOrgIDRe.MatchString(orgID) +} + func (u *userTSDB) acquireReadLock() error { u.stateMtx.RLock() defer u.stateMtx.RUnlock() diff --git a/pkg/ingester/pushstream_security_test.go b/pkg/ingester/pushstream_security_test.go new file mode 100644 index 00000000000..83f717570f8 --- /dev/null +++ b/pkg/ingester/pushstream_security_test.go @@ -0,0 +1,133 @@ +package ingester + +import ( + "context" + "io" + "testing" + "time" + + "github.com/gogo/status" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" +) + +// pushStreamServer is a mock implementation of Ingester_PushStreamServer for testing PushStream. +// Context() injects stream-level authentication (X-Scope-OrgID), +// and Recv() provides the sequence of StreamWriteRequests sent by the client. +type pushStreamServer struct { + grpc.ServerStream + ctx context.Context + requests []*cortexpb.StreamWriteRequest + idx int + sent []*cortexpb.WriteResponse +} + +func (s *pushStreamServer) Context() context.Context { return s.ctx } + +func (s *pushStreamServer) Recv() (*cortexpb.StreamWriteRequest, error) { + if s.idx >= len(s.requests) { + return nil, io.EOF + } + req := s.requests[s.idx] + s.idx++ + return req, nil +} + +func (s *pushStreamServer) Send(resp *cortexpb.WriteResponse) error { + s.sent = append(s.sent, resp) + return nil +} + +// makeWriteReq builds a WriteRequest containing a single sample with the given metric name. +func makeWriteReq(metricName string) *cortexpb.WriteRequest { + return &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: []cortexpb.LabelAdapter{ + {Name: "__name__", Value: metricName}, + }, + Samples: []cortexpb.Sample{ + {Value: 13.37, TimestampMs: time.Now().UnixMilli()}, + }, + }, + }, + }, + } +} + +// newTestIngester creates an ingester instance for PushStream security tests. +func newTestIngester(t *testing.T) *Ingester { + t.Helper() + cfg := defaultIngesterTestConfig(t) + ing, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), ing) + }) + test.Poll(t, time.Second, ring.ACTIVE, func() any { + return ing.lifecycler.GetState() + }) + return ing +} + +// TestPushStream_TenantImpersonation reproduces the tenant impersonation vulnerability +// and verifies it is blocked after the fix is applied. +func TestPushStream_TenantImpersonation(t *testing.T) { + ing := newTestIngester(t) + + // The stream is authenticated as user-1 (X-Scope-OrgID: user-1). + streamCtx := user.InjectOrgID(context.Background(), "user-1") + + srv := &pushStreamServer{ + ctx: streamCtx, + requests: []*cortexpb.StreamWriteRequest{ + { + // Impersonation: stream is user-1 but the request claims user-2. + TenantID: "user-2", + Request: makeWriteReq("poc_injected_metric"), + }, + }, + } + + err := ing.PushStream(srv) + + require.Error(t, err, "a stream authenticated as user-1 must not write for user-2") + st, ok := status.FromError(err) + require.True(t, ok, "error must be a gRPC status error") + require.Equal(t, codes.PermissionDenied, st.Code(), + "tenant ID mismatch must result in PermissionDenied") +} + +// TestAttack_DirectGRPC_BypassWithWorkerID proves that a knowledgeable attacker +// can bypass the tenant check by spoofing the distributor worker ID pattern. +func TestAttack_DirectGRPC_BypassWithWorkerID(t *testing.T) { + ing := newTestIngester(t) + + // "ingester--stream-push-worker-" + spoofedWorkerCtx := user.InjectOrgID(context.Background(), "ingester-fake-stream-push-worker-0") + + // Step 2: Attacker requests to write into "tenant-B" + srv := &pushStreamServer{ + ctx: spoofedWorkerCtx, + requests: []*cortexpb.StreamWriteRequest{ + { + TenantID: "tenant-B", + Request: makeWriteReq("hacked_metric_bypass"), + }, + }, + } + + err := ing.PushStream(srv) + + require.NoError(t, err, "ATTACK SUCCEEDED: The attacker bypassed the check by spoofing the worker ID, `-distributor.sign-write-requests-keys` should be used to prevent this") +} diff --git a/pkg/util/flagext/secretstringslicecsv.go b/pkg/util/flagext/secretstringslicecsv.go new file mode 100644 index 00000000000..0a1dea7834e --- /dev/null +++ b/pkg/util/flagext/secretstringslicecsv.go @@ -0,0 +1,49 @@ +package flagext + +import "strings" + +// SecretStringSliceCSV is a slice of strings that is parsed from a comma-separated string. +// It implements flag.Value and yaml Marshalers, but masks the value when marshaled to YAML +// so that secrets are not exposed via the /config endpoint. +type SecretStringSliceCSV struct { + values []string +} + +// String implements flag.Value +func (v SecretStringSliceCSV) String() string { + return strings.Join(v.values, ",") +} + +// Set implements flag.Value +func (v *SecretStringSliceCSV) Set(s string) error { + if s == "" { + v.values = nil + return nil + } + v.values = strings.Split(s, ",") + return nil +} + +// Value returns the underlying string slice. +func (v SecretStringSliceCSV) Value() []string { + return v.values +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (v *SecretStringSliceCSV) UnmarshalYAML(unmarshal func(any) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + return v.Set(s) +} + +// MarshalYAML implements yaml.Marshaler. +// The value is masked to avoid exposing secrets via the /config endpoint. +func (v SecretStringSliceCSV) MarshalYAML() (any, error) { + if len(v.values) == 0 { + return "", nil + } + return "********", nil +} diff --git a/pkg/util/flagext/secretstringslicecsv_test.go b/pkg/util/flagext/secretstringslicecsv_test.go new file mode 100644 index 00000000000..58772e28c11 --- /dev/null +++ b/pkg/util/flagext/secretstringslicecsv_test.go @@ -0,0 +1,47 @@ +package flagext + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestSecretStringSliceCSV(t *testing.T) { + type TestStruct struct { + Keys SecretStringSliceCSV `yaml:"keys"` + } + + t.Run("values are masked when marshaled to YAML", func(t *testing.T) { + var s TestStruct + require.NoError(t, s.Keys.Set("key1,key2,key3")) + + assert.Equal(t, []string{"key1", "key2", "key3"}, s.Keys.Value()) + assert.Equal(t, "key1,key2,key3", s.Keys.String()) + + actual, err := yaml.Marshal(s) + require.NoError(t, err) + assert.Equal(t, "keys: '********'\n", string(actual)) + }) + + t.Run("values are unmarshaled correctly from YAML", func(t *testing.T) { + var s TestStruct + require.NoError(t, yaml.Unmarshal([]byte("keys: key1,key2,key3\n"), &s)) + assert.Equal(t, []string{"key1", "key2", "key3"}, s.Keys.Value()) + }) + + t.Run("empty value marshals to empty string", func(t *testing.T) { + var s TestStruct + actual, err := yaml.Marshal(s) + require.NoError(t, err) + assert.Equal(t, "keys: \"\"\n", string(actual)) + }) + + t.Run("empty string set clears values", func(t *testing.T) { + var s TestStruct + require.NoError(t, s.Keys.Set("key1,key2")) + require.NoError(t, s.Keys.Set("")) + assert.Equal(t, []string(nil), s.Keys.Value()) + }) +} diff --git a/pkg/util/grpcclient/grpcclient.go b/pkg/util/grpcclient/grpcclient.go index 6e219cec979..aeacf943856 100644 --- a/pkg/util/grpcclient/grpcclient.go +++ b/pkg/util/grpcclient/grpcclient.go @@ -33,6 +33,7 @@ type Config struct { TLSEnabled bool `yaml:"tls_enabled"` TLS tls.ClientConfig `yaml:",inline"` SignWriteRequestsEnabled bool `yaml:"-"` + SignWriteRequestsKey string `yaml:"-"` ConnectTimeout time.Duration `yaml:"connect_timeout"` } @@ -130,6 +131,9 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep if cfg.SignWriteRequestsEnabled { unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor) + if cfg.SignWriteRequestsKey != "" { + streamClientInterceptors = append(streamClientInterceptors, NewStreamSigningClientInterceptor(cfg.SignWriteRequestsKey)) + } } return append( diff --git a/pkg/util/grpcclient/signing_handler.go b/pkg/util/grpcclient/signing_handler.go index a6f5ee2f736..95b08bed685 100644 --- a/pkg/util/grpcclient/signing_handler.go +++ b/pkg/util/grpcclient/signing_handler.go @@ -2,14 +2,21 @@ package grpcclient import ( "context" + "crypto/hmac" + "crypto/sha256" + "crypto/subtle" + "encoding/hex" "github.com/weaveworks/common/errors" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + + "github.com/cortexproject/cortex/pkg/util/users" ) var ( - reqSignHeaderName = "x-req-signature" + reqSignHeaderName = "x-req-signature" + streamSignHeaderName = "x-stream-signature" ) const ( @@ -17,6 +24,8 @@ const ( ErrMultipleSignaturePresent = errors.Error("multiples signature present") ErrSignatureNotPresent = errors.Error("signature not present") ErrSignatureMismatch = errors.Error("signature mismatch") + + pushStreamFullMethod = "/cortex.Ingester/PushStream" ) // SignRequest define the interface that must be implemented by the request structs to be signed @@ -86,3 +95,70 @@ func UnarySigningClientInterceptor(ctx context.Context, method string, req, repl return invoker(newCtx, method, req, reply, cc, opts...) } + +// computeStreamHMAC computes HMAC-SHA256(key, orgID) and returns the hex digest. +func computeStreamHMAC(key, orgID string) string { + mac := hmac.New(sha256.New, []byte(key)) + mac.Write([]byte(orgID)) + return hex.EncodeToString(mac.Sum(nil)) +} + +// NewStreamSigningClientInterceptor returns a gRPC stream client interceptor that injects +// an HMAC-SHA256 signature into the outgoing stream metadata. +func NewStreamSigningClientInterceptor(key string) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + if method != pushStreamFullMethod { + return streamer(ctx, desc, cc, method, opts...) + } + + orgID, err := users.TenantID(ctx) // ingester-%s-stream-push-worker-%d + if err != nil { + return nil, err + } + sig := computeStreamHMAC(key, orgID) + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(nil) + } else { + md = md.Copy() + } + md.Set(streamSignHeaderName, sig) + + return streamer(metadata.NewOutgoingContext(ctx, md), desc, cc, method, opts...) + } +} + +// NewStreamSigningServerInterceptor returns a gRPC stream server interceptor that verifies +// the HMAC-SHA256 signature injected by NewStreamSigningClientInterceptor. +func NewStreamSigningServerInterceptor(keys ...string) grpc.StreamServerInterceptor { + return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + if info.FullMethod != pushStreamFullMethod { + return handler(srv, ss) + } + + ctx := ss.Context() + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return ErrSignatureNotPresent + } + + sigs := md.Get(streamSignHeaderName) + if len(sigs) != 1 { + return ErrSignatureNotPresent + } + + orgID, err := users.TenantID(ctx) // ingester-%s-stream-push-worker-%d + if err != nil { + return err + } + sig := sigs[0] + for _, key := range keys { + expectedSig := computeStreamHMAC(key, orgID) + if subtle.ConstantTimeCompare([]byte(sig), []byte(expectedSig)) == 1 { + return handler(srv, ss) + } + } + return ErrSignatureMismatch + } +} diff --git a/pkg/util/grpcclient/signing_handler_test.go b/pkg/util/grpcclient/signing_handler_test.go index 07193055a03..76b0f882f64 100644 --- a/pkg/util/grpcclient/signing_handler_test.go +++ b/pkg/util/grpcclient/signing_handler_test.go @@ -65,3 +65,120 @@ func TestUnarySigningHandler(t *testing.T) { }) require.ErrorIs(t, err, ErrMultipleSignaturePresent) } + +// mockServerStream is a minimal grpc.ServerStream stub for testing stream interceptors. +type mockServerStream struct { + grpc.ServerStream + ctx context.Context +} + +func (m *mockServerStream) Context() context.Context { return m.ctx } + +var pushStreamInfo = &grpc.StreamServerInfo{FullMethod: pushStreamFullMethod} + +func TestStreamSigningInterceptors(t *testing.T) { + const testKey = "super-secret-signing-key" + orgID := "ingester-fake-stream-push-worker-0" + + sign := func(key string) metadata.MD { + clientCtx := user.InjectOrgID(context.Background(), orgID) + var capturedCtx context.Context + interceptor := NewStreamSigningClientInterceptor(key) + _, _ = interceptor(clientCtx, nil, nil, pushStreamFullMethod, + func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + capturedCtx = ctx + return nil, nil + }) + outMD, _ := metadata.FromOutgoingContext(capturedCtx) + return outMD + } + + verify := func(interceptor grpc.StreamServerInterceptor, incomingMD metadata.MD) error { + serverCtx := metadata.NewIncomingContext(user.InjectOrgID(context.Background(), orgID), incomingMD) + ss := &mockServerStream{ctx: serverCtx} + return interceptor(nil, ss, pushStreamInfo, func(srv any, stream grpc.ServerStream) error { return nil }) + } + + t.Run("valid signature is accepted", func(t *testing.T) { + interceptor := NewStreamSigningServerInterceptor(testKey) + require.NoError(t, verify(interceptor, sign(testKey))) + }) + + t.Run("wrong key is rejected with ErrSignatureMismatch", func(t *testing.T) { + interceptor := NewStreamSigningServerInterceptor(testKey) + err := verify(interceptor, sign("wrong-key")) + require.ErrorIs(t, err, ErrSignatureMismatch) + }) + + t.Run("missing signature is rejected with ErrSignatureNotPresent", func(t *testing.T) { + // No client interceptor – server receives a stream with no signature header. + interceptor := NewStreamSigningServerInterceptor(testKey) + err := verify(interceptor, metadata.New(nil)) + require.ErrorIs(t, err, ErrSignatureNotPresent) + }) + + t.Run("non-PushStream method bypasses verification", func(t *testing.T) { + // Server interceptor must pass through unrelated streaming RPCs unchanged. + interceptor := NewStreamSigningServerInterceptor(testKey) + serverCtx := metadata.NewIncomingContext(user.InjectOrgID(context.Background(), orgID), metadata.New(nil)) + ss := &mockServerStream{ctx: serverCtx} + otherInfo := &grpc.StreamServerInfo{FullMethod: "/cortex.Querier/QueryStream"} + err := interceptor(nil, ss, otherInfo, func(srv any, stream grpc.ServerStream) error { return nil }) + require.NoError(t, err) + }) +} + +// TestStreamSigningKeyRotation verifies that NewStreamSigningServerInterceptor accepts a +// signature produced with any of the configured keys, enabling zero-downtime key rotation. +func TestStreamSigningKeyRotation(t *testing.T) { + const oldKey = "old-secret" + const newKey = "new-secret" + orgID := "ingester-addr-stream-push-worker-0" + + sign := func(key string) metadata.MD { + clientCtx := user.InjectOrgID(context.Background(), orgID) + var capturedCtx context.Context + _, _ = NewStreamSigningClientInterceptor(key)(clientCtx, nil, nil, pushStreamFullMethod, + func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + capturedCtx = ctx + return nil, nil + }) + outMD, _ := metadata.FromOutgoingContext(capturedCtx) + return outMD + } + + verify := func(interceptor grpc.StreamServerInterceptor, incomingMD metadata.MD) error { + serverCtx := metadata.NewIncomingContext(user.InjectOrgID(context.Background(), orgID), incomingMD) + ss := &mockServerStream{ctx: serverCtx} + return interceptor(nil, ss, pushStreamInfo, func(srv any, stream grpc.ServerStream) error { return nil }) + } + + // Step1: Ingester is configured with both keys (transition period). + // Distributor still signs with oldKey (not yet redeployed). + transitInterceptor := NewStreamSigningServerInterceptor(newKey, oldKey) + + t.Run("step1: old-key signature accepted during transition", func(t *testing.T) { + require.NoError(t, verify(transitInterceptor, sign(oldKey))) + }) + + t.Run("step1: new-key signature also accepted during transition", func(t *testing.T) { + require.NoError(t, verify(transitInterceptor, sign(newKey))) + }) + + // Step 2: Distributor redeployed with newKey; ingester still in transition. + t.Run("step2: distributor now signs with new key, still accepted", func(t *testing.T) { + require.NoError(t, verify(transitInterceptor, sign(newKey))) + }) + + // Step 3: Ingester redeployed with newKey only. + finalInterceptor := NewStreamSigningServerInterceptor(newKey) + + t.Run("step3: new-key signature accepted after rotation complete", func(t *testing.T) { + require.NoError(t, verify(finalInterceptor, sign(newKey))) + }) + + t.Run("step3: old-key signature rejected after rotation complete", func(t *testing.T) { + err := verify(finalInterceptor, sign(oldKey)) + require.ErrorIs(t, err, ErrSignatureMismatch) + }) +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 2521b40aca8..6aa466571de 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4104,6 +4104,11 @@ "type": "boolean", "x-cli-flag": "distributor.sign-write-requests" }, + "sign_write_requests_keys": { + "description": "EXPERIMENTAL: Comma-separated list of HMAC-SHA256 keys authenticating PushStream connections between distributors and ingesters. The first key is used by the distributor to sign; all keys are accepted by the ingester. It only takes effect when the -distributor.sign-write-requests is true. The key change procedure for zero downtime is: (1) redeploy ingesters first with 'newkey,oldkey' — ingester accepts both keys; (2) redeploy distributors with 'newkey,oldkey' — distributor signs with newkey; (3) once stable, redeploy both with 'newkey' to drop the old key.", + "type": "string", + "x-cli-flag": "distributor.sign-write-requests-keys" + }, "use_stream_push": { "default": false, "description": "EXPERIMENTAL: If enabled, distributor would use stream connection to send requests to ingesters.", diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index 64b40fc864e..001f8d208b8 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -285,6 +285,8 @@ func getFieldType(t reflect.Type) (string, error) { return "string", nil case "flagext.StringSliceCSV": return "string", nil + case "flagext.SecretStringSliceCSV": + return "string", nil case "flagext.CIDRSliceCSV": return "string", nil case "[]*relabel.Config": @@ -408,6 +410,22 @@ func getCustomFieldEntry(parent reflect.Type, field reflect.StructField, fieldVa fieldDefault: fieldFlag.DefValue, }, nil } + if field.Type == reflect.TypeFor[flagext.SecretStringSliceCSV]() { + fieldFlag, err := getFieldFlag(parent, field, fieldValue, flags) + if err != nil { + return nil, err + } + + return &configEntry{ + kind: "field", + name: getFieldName(field), + required: isFieldRequired(field), + fieldFlag: fieldFlag.Name, + fieldDesc: fieldFlag.Usage, + fieldType: "string", + fieldDefault: fieldFlag.DefValue, + }, nil + } if field.Type == reflect.TypeFor[model.Duration]() { fieldFlag, err := getFieldFlag(parent, field, fieldValue, flags) if err != nil {