Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions pkg/distributor/ingestion_rate_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package distributor

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -121,6 +122,107 @@ func TestIngestionRateStrategy(t *testing.T) {
}
}

func TestGlobalIngestionRateStrategy_BurstBehavior(t *testing.T) {
t.Parallel()

const (
globalRate = 4000000.0 // 4M samples/sec global
burstSize = 800000 // 800K burst (not divided)
numDistributors = 10
)

// Per-distributor rate should be globalRate / numDistributors = 400K/sec
expectedPerDistributorRate := globalRate / float64(numDistributors)

limits := validation.Limits{
IngestionRateStrategy: validation.GlobalIngestionRateStrategy,
IngestionRate: globalRate,
IngestionBurstSize: burstSize,
}
overrides := validation.NewOverrides(limits, nil)

ring := newReadLifecyclerMock()
ring.On("HealthyInstancesCount").Return(numDistributors)

strategy := newGlobalIngestionRateStrategy(overrides, ring)

t.Run("rate is divided across distributors", func(t *testing.T) {
t.Parallel()
assert.Equal(t, expectedPerDistributorRate, strategy.Limit("test"))
})

t.Run("burst is not divided across distributors", func(t *testing.T) {
t.Parallel()
assert.Equal(t, burstSize, strategy.Burst("test"))
})

t.Run("token bucket allows burst then throttles sustained over-rate traffic", func(t *testing.T) {
t.Parallel()
rl := limiter.NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

// The bucket starts full at burstSize (800K tokens).
// Per-distributor rate refills at 400K tokens/sec.
// If we consume at 600K/sec (200K over rate), we drain the burst
// at 200K/sec, so burst lasts 800K / 200K = 4 seconds.

batchSize := 60000 // 60K samples per batch
batchesPerSec := 10 // 10 batches/sec = 600K samples/sec

batchInterval := time.Second / time.Duration(batchesPerSec)
totalAllowed := 0
firstRejectedAt := time.Duration(0)

// Simulate 10 seconds of traffic
for i := 0; i < batchesPerSec*10; i++ {
ts := now.Add(batchInterval * time.Duration(i))
if rl.AllowN(ts, "test", batchSize) {
totalAllowed++
} else if firstRejectedAt == 0 {
firstRejectedAt = batchInterval * time.Duration(i)
}
}

// Should allow some batches initially (burst absorbs the overage)
assert.Greater(t, totalAllowed, 0, "some batches should be allowed initially")

// Should eventually reject (burst exhausted)
assert.Greater(t, firstRejectedAt, time.Duration(0), "should eventually reject batches")

// First rejection should happen around 4 seconds (800K burst / 200K overage per sec)
// Allow some tolerance due to discrete batch timing
assert.Greater(t, firstRejectedAt, 2*time.Second, "burst should sustain overage for more than 2s")
assert.Less(t, firstRejectedAt, 6*time.Second, "burst should be exhausted before 6s")
})

t.Run("sustained traffic at exactly per-distributor rate is not throttled", func(t *testing.T) {
t.Parallel()
rl := limiter.NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

// Send at exactly the per-distributor rate: 400K/sec in 40K batches, 10/sec
batchSize := 40000
batchesPerSec := 10
batchInterval := time.Second / time.Duration(batchesPerSec)

// Simulate 30 seconds of traffic at the exact rate
for i := 0; i < batchesPerSec*30; i++ {
ts := now.Add(batchInterval * time.Duration(i))
allowed := rl.AllowN(ts, "test", batchSize)
assert.True(t, allowed, "batch %d at %v should be allowed at sustained rate", i, batchInterval*time.Duration(i))
}
})

t.Run("single request exceeding burst size is always rejected", func(t *testing.T) {
t.Parallel()
rl := limiter.NewRateLimiter(strategy, 10*time.Second)
now := time.Now()

// A single request larger than burst size should always be rejected
assert.False(t, rl.AllowN(now, "test", burstSize+1))
})
}

type readLifecyclerMock struct {
mock.Mock
}
Expand Down
Loading