Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* Metrics: Renamed `cortex_parquet_queryable_cache_*` to `cortex_parquet_cache_*`.
* Flags: Renamed `-querier.parquet-queryable-shard-cache-size` to `-querier.parquet-shard-cache-size` and `-querier.parquet-queryable-shard-cache-ttl` to `-querier.parquet-shard-cache-ttl`.
* Config: Renamed `parquet_queryable_shard_cache_size` to `parquet_shard_cache_size` and `parquet_queryable_shard_cache_ttl` to `parquet_shard_cache_ttl`.
* [FEATURE] HATracker: Add experimental support for `memberlist` and `multi` as a KV store backend. #7284
* [FEATURE] StoreGateway: Introduces a new parquet mode. #7046
* [FEATURE] StoreGateway: Add a parquet shard cache to parquet mode. #7166
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ The next three options only apply when the querier is used together with the Que

### Ring/HA Tracker Store

The KVStore client is used by both the Ring and HA Tracker (HA Tracker doesn't support memberlist as KV store).
The KVStore client is used by both the Ring and HA Tracker (HA Tracker supports memberlist as a KV store as an experimental feature).
- `{ring,distributor.ha-tracker}.prefix`
The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar.
- `{ring,distributor.ha-tracker}.store`
Expand Down
5 changes: 2 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3103,9 +3103,8 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.enable-startup-sync
[enable_startup_sync: <boolean> | default = false]

# Backend storage to use for the ring. Please be aware that memberlist is not
# supported by the HA tracker since gossip propagation is too slow for HA
# purposes.
# Backend storage to use for the ring. Memberlist support in the HA tracker is
# experimental, as gossip propagation delays may impact HA performance.
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
# dynamodb, etcd, inmemory, memberlist, multi.
Expand Down
244 changes: 244 additions & 0 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -246,3 +248,245 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) {
"expected all instances to have %f ring members and %f tombstones",
expectedRingMembers, expectedTombstones)
}

func TestHATrackerWithMemberlistClusterSync(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-distributor.ha-tracker.enable": "true",
"-distributor.ha-tracker.enable-for-all-users": "true",
"-distributor.ha-tracker.cluster": "cluster",
"-distributor.ha-tracker.replica": "__replica__",
// Use memberlist as the KV store for the HA Tracker
"-distributor.ha-tracker.store": "memberlist",

// To fast failover
"-distributor.ha-tracker.update-timeout": "1s",
"-distributor.ha-tracker.update-timeout-jitter-max": "0s",
"-distributor.ha-tracker.failover-timeout": "2s",

// memberlist config
"-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
})

cortex1 := newSingleBinary("cortex-1", "", "", flags)
cortex2 := newSingleBinary("cortex-2", "", networkName+"-cortex-1:8000", flags)
cortex3 := newSingleBinary("cortex-3", "", networkName+"-cortex-1:8000", flags)

require.NoError(t, s.StartAndWaitReady(cortex1))
require.NoError(t, s.StartAndWaitReady(cortex2, cortex3))

// Ensure both Cortex instances have successfully discovered each other in the memberlist cluster.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(3), "memberlist_client_cluster_members_count"))

// All Cortex servers should have 512 tokens, altogether 3 * 512.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total"))
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(3*512), "cortex_ring_tokens_total"))

now := time.Now()
userID := "user-1"

client1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"})
// send to cortex1
res, err := client1.Push([]prompb.TimeSeries{series[0]})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_ha_tracker_elected_replica_changes_total"))
// cortex-2 should be noticed HA reader via memberlist gossip
require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics))
// cortex-3 should be noticed HA reader via memberlist gossip
require.NoError(t, cortex3.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ha_tracker_elected_replica_changes_total"}, e2e.WaitMissingMetrics))

// Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed.
time.Sleep(5 * time.Second)

client2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

series2, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"})
// send to cortex2
res2, err := client2.Push([]prompb.TimeSeries{series2[0]})
require.NoError(t, err)
require.Equal(t, 200, res2.StatusCode)

// cortex2 failover to replica1
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total"))
// cortex-1 should be noticed changed HA reader via memberlist gossip
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total"))
// cortex-3 should be noticed changed HA reader via memberlist gossip
require.NoError(t, cortex3.WaitSumMetrics(e2e.Equals(2), "cortex_ha_tracker_elected_replica_changes_total"))
}

func TestHATrackerWithMemberlist(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This integration test is fine. do you think another test with 2 cortex instances would be valuable? I am thinking sending samples to an instance and ensuring the other instance has the same HA leader

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it looks nice.
I updated TestHATrackerWithMemberlistClusterSync to the latest commit

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-distributor.ha-tracker.enable": "true",
"-distributor.ha-tracker.enable-for-all-users": "true",
"-distributor.ha-tracker.cluster": "cluster",
"-distributor.ha-tracker.replica": "__replica__",
// Use memberlist as the KV store for the HA Tracker
"-distributor.ha-tracker.store": "memberlist",

// To fast failover
"-distributor.ha-tracker.update-timeout": "1s",
"-distributor.ha-tracker.update-timeout-jitter-max": "0s",
"-distributor.ha-tracker.failover-timeout": "2s",

// memberlist config
"-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
})

cortex := newSingleBinary("cortex", "", "", flags)
require.NoError(t, s.StartAndWaitReady(cortex))

require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

now := time.Now()
numUsers := 100

for i := 1; i <= numUsers; i++ {
userID := fmt.Sprintf("user-%d", i)
client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"})
res, err := client.Push([]prompb.TimeSeries{series[0]})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers)), "cortex_ha_tracker_elected_replica_changes_total"))

// Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed.
time.Sleep(5 * time.Second)

for i := 1; i <= numUsers; i++ {
userID := fmt.Sprintf("user-%d", i)
client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

// This time, we send data from replica1 instead of replica0.
series, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"})
res, err := client.Push([]prompb.TimeSeries{series[0]})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

// Since the leader successfully failed over to replica1, the change count increments by 1 per user
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_ha_tracker_elected_replica_changes_total"))
}

func TestHATrackerWithMultiKV(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-distributor.ha-tracker.enable": "true",
"-distributor.ha-tracker.enable-for-all-users": "true",
"-distributor.ha-tracker.cluster": "cluster",
"-distributor.ha-tracker.replica": "__replica__",
// Use memberlist as the KV store for the HA Tracker
"-distributor.ha-tracker.store": "multi",

// To fast failover
"-distributor.ha-tracker.update-timeout": "1s",
"-distributor.ha-tracker.update-timeout-jitter-max": "0s",
"-distributor.ha-tracker.failover-timeout": "2s",

// multi KV config
"-distributor.ha-tracker.multi.primary": "consul",
"-distributor.ha-tracker.multi.secondary": "memberlist",
"-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(),

// Enable data mirroring
"-distributor.ha-tracker.multi.mirror-enabled": "true",

// memberlist config
"-ring.store": "memberlist",
"-memberlist.bind-port": "8000",
})

cortex := newSingleBinary("cortex", "", "", flags)
require.NoError(t, s.StartAndWaitReady(cortex))

require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(1), "memberlist_client_cluster_members_count"))
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// mirror enabled
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_multikv_mirror_enabled"))
// consul as primary KV Store
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_multikv_primary_store"}, e2e.WaitMissingMetrics,
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "store", "consul"))),
)

now := time.Now()
numUsers := 100

for i := 1; i <= numUsers; i++ {
userID := fmt.Sprintf("user-%d", i)
client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

series, _ := generateSeries("foo", now, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"})
res, err := client.Push([]prompb.TimeSeries{series[0]})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers)), "cortex_ha_tracker_elected_replica_changes_total"))

// Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed.
time.Sleep(5 * time.Second)

for i := 1; i <= numUsers; i++ {
userID := fmt.Sprintf("user-%d", i)
client, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

// This time, we send data from replica1 instead of replica0.
series, _ := generateSeries("foo", now.Add(time.Second*30), prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"})
res, err := client.Push([]prompb.TimeSeries{series[0]})
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

// Since the leader successfully failed over to replica1, the change count increments by 1 per user
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_ha_tracker_elected_replica_changes_total"))
// Two keys (1 cluster with 2 replicas) per user should be written to the memberlist (secondary store)
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(numUsers*2)), "cortex_multikv_mirror_writes_total"))
}
3 changes: 3 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cortexproject/cortex/pkg/flusher"
"github.com/cortexproject/cortex/pkg/frontend"
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/ha"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/overrides"
"github.com/cortexproject/cortex/pkg/parquetconverter"
Expand Down Expand Up @@ -821,6 +822,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {
t.Cfg.MemberlistKV.MetricsRegisterer = reg
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
ha.GetReplicaDescCodec(),
}
dnsProviderReg := prometheus.WrapRegistererWithPrefix(
"cortex_",
Expand All @@ -835,6 +837,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {

// Update the config.
t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Distributor.HATrackerConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
Expand Down
Loading
Loading