Skip to content

Commit 9d6ef5c

Browse files
authored
adding discardOutOfOrder field in write request (#7330)
1 parent 6294bba commit 9d6ef5c

10 files changed

Lines changed: 383 additions & 109 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
4545
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
4646
* [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253
47+
* [ENHANCEMENT] Ruler/Ingester: Propagate append hints to discard out of order samples on Ingester #7226
4748
* [ENHANCEMENT] Make cortex_ingester_tsdb_sample_ooo_delta metric per-tenant #7278
4849
* [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics.
4950
* [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293

pkg/cortexpb/cortex.pb.go

Lines changed: 146 additions & 95 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cortexpb/cortex.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ message WriteRequest {
2424

2525
bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus
2626
MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "MessageWithBufRef", (gogoproto.nullable) = false];
27+
// When true, indicates that out-of-order samples should be discarded even if OOO is enabled.
28+
bool discard_out_of_order = 1002;
2729
}
2830

2931
// refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto

pkg/cortexpb/timeseries.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ func ReuseWriteRequest(req *PreallocWriteRequest) {
103103
req.Source = 0
104104
req.Metadata = nil
105105
req.Timeseries = nil
106+
req.DiscardOutOfOrder = false
106107
writeRequestPool.Put(req)
107108
}
108109

pkg/distributor/distributor.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, s
984984
}
985985
}
986986

987-
return d.send(localCtx, ingester, timeseries, metadata, req.Source)
987+
return d.send(localCtx, ingester, timeseries, metadata, req.Source, req.DiscardOutOfOrder)
988988
}, func() {
989989
cortexpb.ReuseSlice(req.Timeseries)
990990
req.Free()
@@ -1252,7 +1252,7 @@ func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
12521252
})
12531253
}
12541254

1255-
func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum) error {
1255+
func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, timeseries []cortexpb.PreallocTimeseries, metadata []*cortexpb.MetricMetadata, source cortexpb.SourceEnum, discardOutOfOrder bool) error {
12561256
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
12571257
if err != nil {
12581258
return err
@@ -1270,16 +1270,18 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
12701270

12711271
if d.cfg.UseStreamPush {
12721272
req := &cortexpb.WriteRequest{
1273-
Timeseries: timeseries,
1274-
Metadata: metadata,
1275-
Source: source,
1273+
Timeseries: timeseries,
1274+
Metadata: metadata,
1275+
Source: source,
1276+
DiscardOutOfOrder: discardOutOfOrder,
12761277
}
12771278
_, err = c.PushStreamConnection(ctx, req)
12781279
} else {
12791280
req := cortexpb.PreallocWriteRequestFromPool()
12801281
req.Timeseries = timeseries
12811282
req.Metadata = metadata
12821283
req.Source = source
1284+
req.DiscardOutOfOrder = discardOutOfOrder
12831285

12841286
_, err = c.PushPreAlloc(ctx, req)
12851287

pkg/distributor/distributor_test.go

Lines changed: 105 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,99 @@ func TestDistributor_Push(t *testing.T) {
430430
}
431431
}
432432

433+
func TestDistributor_Push_DiscardOutOfOrder(t *testing.T) {
434+
t.Parallel()
435+
436+
ctx := user.InjectOrgID(context.Background(), "userDiscardOOO")
437+
438+
tests := []struct {
439+
name string
440+
discardOutOfOrder bool
441+
expectedDiscardOOO bool
442+
useStreamPush bool
443+
}{
444+
{
445+
name: "DiscardOutOfOrder=true with regular push",
446+
discardOutOfOrder: true,
447+
expectedDiscardOOO: true,
448+
useStreamPush: false,
449+
},
450+
{
451+
name: "DiscardOutOfOrder=false with regular push",
452+
discardOutOfOrder: false,
453+
expectedDiscardOOO: false,
454+
useStreamPush: false,
455+
},
456+
{
457+
name: "DiscardOutOfOrder=true with stream push",
458+
discardOutOfOrder: true,
459+
expectedDiscardOOO: true,
460+
useStreamPush: true,
461+
},
462+
{
463+
name: "DiscardOutOfOrder=false with stream push",
464+
discardOutOfOrder: false,
465+
expectedDiscardOOO: false,
466+
useStreamPush: true,
467+
},
468+
}
469+
470+
for _, tc := range tests {
471+
t.Run(tc.name, func(t *testing.T) {
472+
t.Parallel()
473+
474+
limits := &validation.Limits{}
475+
flagext.DefaultValues(limits)
476+
477+
ds, ingesters, _, _ := prepare(t, prepConfig{
478+
numIngesters: 3,
479+
happyIngesters: 3,
480+
numDistributors: 1,
481+
shardByAllLabels: true,
482+
limits: limits,
483+
useStreamPush: tc.useStreamPush,
484+
})
485+
486+
request := makeWriteRequest(123456789000, 5, 0, 0)
487+
request.DiscardOutOfOrder = tc.discardOutOfOrder
488+
489+
_, err := ds[0].Push(ctx, request)
490+
require.NoError(t, err)
491+
492+
// Poll to ensure all ingesters have received the push before verifying.
493+
test.Poll(t, time.Second, nil, func() any {
494+
for _, ing := range ingesters {
495+
ing.Lock()
496+
pushCalls := ing.calls["Push"]
497+
lastDiscardOOO := ing.lastDiscardOutOfOrder
498+
ing.Unlock()
499+
500+
// Wait for all ingesters to receive the push call
501+
if pushCalls == 0 {
502+
return fmt.Errorf("ingester has not received push yet")
503+
}
504+
505+
// Wait for the DiscardOutOfOrder flag to match expected value
506+
if lastDiscardOOO != tc.expectedDiscardOOO {
507+
return fmt.Errorf("ingester has DiscardOutOfOrder=%v, expected %v", lastDiscardOOO, tc.expectedDiscardOOO)
508+
}
509+
}
510+
return nil
511+
})
512+
513+
// Final assertion: verify all ingesters received the correct DiscardOutOfOrder flag
514+
for _, ing := range ingesters {
515+
ing.Lock()
516+
lastDiscardOOO := ing.lastDiscardOutOfOrder
517+
ing.Unlock()
518+
519+
assert.Equal(t, tc.expectedDiscardOOO, lastDiscardOOO,
520+
"ingester should have received DiscardOutOfOrder=%v", tc.expectedDiscardOOO)
521+
}
522+
})
523+
}
524+
}
525+
433526
func TestDistributor_MetricsCleanup(t *testing.T) {
434527
t.Parallel()
435528
dists, _, regs, r := prepare(t, prepConfig{
@@ -3604,14 +3697,15 @@ type mockIngester struct {
36043697
sync.Mutex
36053698
client.IngesterClient
36063699
grpc_health_v1.HealthClient
3607-
happy atomic.Bool
3608-
failResp atomic.Error
3609-
stats client.UsersStatsResponse
3610-
timeseries map[uint32]*cortexpb.PreallocTimeseries
3611-
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
3612-
queryDelay time.Duration
3613-
calls map[string]int
3614-
lblsValues []string
3700+
happy atomic.Bool
3701+
failResp atomic.Error
3702+
stats client.UsersStatsResponse
3703+
timeseries map[uint32]*cortexpb.PreallocTimeseries
3704+
metadata map[uint32]map[cortexpb.MetricMetadata]struct{}
3705+
queryDelay time.Duration
3706+
calls map[string]int
3707+
lblsValues []string
3708+
lastDiscardOutOfOrder bool
36153709
}
36163710

36173711
func newMockIngester(id int, ps *prepState, cfg prepConfig) *mockIngester {
@@ -3682,6 +3776,9 @@ func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opt
36823776

36833777
i.trackCall("Push")
36843778

3779+
// Store the DiscardOutOfOrder flag for test assertions
3780+
i.lastDiscardOutOfOrder = req.DiscardOutOfOrder
3781+
36853782
if !i.happy.Load() {
36863783
return nil, i.failResp.Load()
36873784
}

pkg/ingester/ingester.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,6 +1423,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
14231423

14241424
// Walk the samples, appending them to the users database
14251425
app := db.Appender(ctx).(extendedAppender)
1426+
1427+
// Even when OOO is enabled globally, we want to reject OOO samples in some cases.
1428+
// prometheus implementation: https://github.com/prometheus/prometheus/pull/14710
1429+
if req.DiscardOutOfOrder {
1430+
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
1431+
}
1432+
14261433
var newSeries []labels.Labels
14271434

14281435
for _, ts := range req.Timeseries {

pkg/ingester/ingester_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7863,3 +7863,83 @@ func TestIngester_checkRegexMatcherLimits(t *testing.T) {
78637863
})
78647864
}
78657865
}
7866+
func TestIngester_DiscardOutOfOrderFlagIntegration(t *testing.T) {
7867+
registry := prometheus.NewRegistry()
7868+
cfg := defaultIngesterTestConfig(t)
7869+
cfg.LifecyclerConfig.JoinAfter = 0
7870+
7871+
limits := defaultLimitsTestConfig()
7872+
limits.EnableNativeHistograms = true
7873+
limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute)
7874+
7875+
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry)
7876+
require.NoError(t, err)
7877+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
7878+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
7879+
7880+
// Wait until it's ACTIVE
7881+
test.Poll(t, time.Second, ring.ACTIVE, func() any {
7882+
return i.lifecycler.GetState()
7883+
})
7884+
7885+
ctx := user.InjectOrgID(context.Background(), "test-user")
7886+
7887+
// Create labels for our test metric
7888+
metricLabels := labels.FromStrings(labels.MetricName, "test_metric", "job", "test")
7889+
7890+
currentTime := time.Now().UnixMilli()
7891+
olderTime := currentTime - 60000 // 1 minute earlier (within OOO window)
7892+
7893+
// First, push a sample with current timestamp with discardOutOfOrder=true
7894+
req1 := cortexpb.ToWriteRequest(
7895+
[]labels.Labels{metricLabels},
7896+
[]cortexpb.Sample{{Value: 100, TimestampMs: currentTime}},
7897+
nil, nil, cortexpb.RULE)
7898+
req1.DiscardOutOfOrder = true
7899+
7900+
_, err = i.Push(ctx, req1)
7901+
require.NoError(t, err, "First sample push should succeed")
7902+
7903+
// Now try to push a sample with older timestamp with discardOutOfOrder=true
7904+
// This should be discarded because DiscardOutOfOrder is true
7905+
req2 := cortexpb.ToWriteRequest(
7906+
[]labels.Labels{metricLabels},
7907+
[]cortexpb.Sample{{Value: 50, TimestampMs: olderTime}},
7908+
nil, nil, cortexpb.RULE)
7909+
req2.DiscardOutOfOrder = true
7910+
7911+
_, _ = i.Push(ctx, req2)
7912+
7913+
// Query back the data to ensure only the first (current time) sample was stored
7914+
s := &mockQueryStreamServer{ctx: ctx}
7915+
err = i.QueryStream(&client.QueryRequest{
7916+
StartTimestampMs: olderTime - 1000,
7917+
EndTimestampMs: currentTime + 1000,
7918+
Matchers: []*client.LabelMatcher{
7919+
{Type: client.EQUAL, Name: labels.MetricName, Value: "test_metric"},
7920+
},
7921+
}, s)
7922+
require.NoError(t, err)
7923+
7924+
// Verify we only have one series with one sample (the current time sample)
7925+
require.Len(t, s.series, 1, "Should have exactly one series")
7926+
7927+
// Convert chunks to samples to verify content
7928+
series := s.series[0]
7929+
require.Len(t, series.Chunks, 1, "Should have exactly one chunk")
7930+
7931+
chunk := series.Chunks[0]
7932+
chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data)
7933+
require.NoError(t, err)
7934+
7935+
iter := chunkData.Iterator(nil)
7936+
sampleCount := 0
7937+
for iter.Next() != chunkenc.ValNone {
7938+
ts, val := iter.At()
7939+
require.Equal(t, currentTime, ts, "Sample timestamp should match current time")
7940+
require.Equal(t, 100.0, val, "Sample value should match first push")
7941+
sampleCount++
7942+
}
7943+
require.NoError(t, iter.Err())
7944+
require.Equal(t, 1, sampleCount, "Should have exactly one sample stored")
7945+
}

pkg/ruler/compat.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type PusherAppender struct {
4949
histogramLabels []labels.Labels
5050
histograms []cortexpb.Histogram
5151
userID string
52+
opts *storage.AppendOptions
5253
}
5354

5455
func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
@@ -73,7 +74,9 @@ func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v
7374
return 0, nil
7475
}
7576

76-
func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {}
77+
func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {
78+
a.opts = opts
79+
}
7780

7881
func (a *PusherAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
7982
// AppendHistogramCTZeroSample is a no-op for PusherAppender as it happens during scrape time only.
@@ -94,6 +97,12 @@ func (a *PusherAppender) Commit() error {
9497

9598
req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)
9699
req.AddHistogramTimeSeries(a.histogramLabels, a.histograms)
100+
101+
// Set DiscardOutOfOrder flag if requested via AppendOptions
102+
if a.opts != nil && a.opts.DiscardOutOfOrder {
103+
req.DiscardOutOfOrder = true
104+
}
105+
97106
// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
98107
// We shouldn't call client.ReuseSlice here.
99108
_, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req)

pkg/ruler/compat_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/prometheus/prometheus/model/value"
1919
"github.com/prometheus/prometheus/promql"
2020
"github.com/prometheus/prometheus/promql/parser"
21+
"github.com/prometheus/prometheus/storage"
2122
"github.com/prometheus/prometheus/tsdb/tsdbutil"
2223
"github.com/stretchr/testify/require"
2324
"github.com/weaveworks/common/httpgrpc"
@@ -413,3 +414,26 @@ func TestRecordAndReportRuleQueryMetrics(t *testing.T) {
413414
require.Equal(t, testutil.ToFloat64(metrics.RulerQueryChunkBytes.WithLabelValues("userID")), float64(10))
414415
require.Equal(t, testutil.ToFloat64(metrics.RulerQueryDataBytes.WithLabelValues("userID")), float64(14))
415416
}
417+
func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) {
418+
pusher := &fakePusher{response: &cortexpb.WriteResponse{}}
419+
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"})
420+
421+
appender := &PusherAppender{
422+
ctx: context.Background(),
423+
pusher: pusher,
424+
userID: "test-user",
425+
totalWrites: counter,
426+
failedWrites: counter,
427+
labels: []labels.Labels{labels.FromStrings(labels.MetricName, "test_metric")},
428+
samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}},
429+
}
430+
431+
appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
432+
433+
err := appender.Commit()
434+
require.NoError(t, err)
435+
436+
// Verify that DiscardOutOfOrder was set in the WriteRequest
437+
require.NotNil(t, pusher.request, "WriteRequest should have been sent")
438+
require.True(t, pusher.request.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest")
439+
}

0 commit comments

Comments
 (0)