diff --git a/CHANGELOG.md b/CHANGELOG.md index 44ced0e184..019636f05a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #7359 * [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #7357 * [ENHANCEMENT] Tenant Federation: Add a local cache to regex resolver. #7363 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7c1cd7265d..8dee6cdf8e 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4564,6 +4564,20 @@ The `memberlist_config` configures the Gossip memberlist. # CLI flag: -memberlist.advertise-port [advertise_port: | default = 7946] +# The cluster label is an optional string to include in outbound packets and +# gossip streams. Other members in the memberlist cluster will discard any +# message whose label doesn't match the configured one, unless the +# 'cluster-label-verification-disabled' configuration option is set to true. +# CLI flag: -memberlist.cluster-label +[cluster_label: | default = ""] + +# When true, memberlist doesn't verify that inbound packets and gossip streams +# have the cluster label matching the configured one. This verification should +# be disabled while rolling out the change to the configured cluster label in a +# live memberlist cluster. +# CLI flag: -memberlist.cluster-label-verification-disabled +[cluster_label_verification_disabled: | default = false] + # Other cluster members to join. Can be specified multiple times. It can be an # IP, hostname or an entry specified in the DNS Service Discovery format. # CLI flag: -memberlist.join diff --git a/docs/configuration/single-process-config-blocks-gossip-1.yaml b/docs/configuration/single-process-config-blocks-gossip-1.yaml index 11f147e8ff..ad3770974c 100644 --- a/docs/configuration/single-process-config-blocks-gossip-1.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-1.yaml @@ -57,6 +57,7 @@ ingester: memberlist: bind_port: 7946 + cluster_label: gossip-demo join_members: - localhost:7947 abort_if_cluster_join_fails: false diff --git a/docs/configuration/single-process-config-blocks-gossip-2.yaml b/docs/configuration/single-process-config-blocks-gossip-2.yaml index db1683afc3..35840ff7b3 100644 --- a/docs/configuration/single-process-config-blocks-gossip-2.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-2.yaml @@ -56,6 +56,7 @@ ingester: memberlist: bind_port: 7947 + cluster_label: gossip-demo join_members: - localhost:7946 abort_if_cluster_join_fails: false diff --git a/docs/guides/gossip-ring-getting-started.md b/docs/guides/gossip-ring-getting-started.md index 13cf2c493b..0ca7bd6650 100644 --- a/docs/guides/gossip-ring-getting-started.md +++ b/docs/guides/gossip-ring-getting-started.md @@ -50,6 +50,7 @@ memberlist: # defaults to hostname node_name: "Ingester 1" bind_port: 7946 + cluster_label: "gossip-demo" join_members: - localhost:7947 abort_if_cluster_join_fails: false @@ -127,9 +128,10 @@ We don't need to change or add `memberlist.join_members` list. This new instance will discover other peers through it. When using Kubernetes, the suggested setup is to have a headless service pointing to all pods that want to be part of the gossip cluster, and then point `join_members` to this headless service. +In production, set `memberlist.cluster_label` to the same value on every Cortex process that should share the same gossip cluster. This helps avoid accidentally merging rings with other Cortex, Mimir, or Loki deployments that can reach the same seed addresses. + We also don't need to change `/tmp/cortex/storage` directory in the `blocks_storage.filesystem.dir` field. This is the directory where all ingesters will "upload" finished blocks. This can also be an S3 or GCP storage, but for simplicity, we use the local filesystem in this example. After these changes, we can start another Cortex instance using the modified configuration file. This instance will join the ring and will start receiving samples after it enters the ACTIVE state. - diff --git a/docs/guides/migration-kv-store-to-memberlist.md b/docs/guides/migration-kv-store-to-memberlist.md index 8ebbcb0ac2..951479a4bc 100644 --- a/docs/guides/migration-kv-store-to-memberlist.md +++ b/docs/guides/migration-kv-store-to-memberlist.md @@ -29,8 +29,10 @@ Update your configuration file and deploy the changes: ring: store: memberlist memberlist: - abort_if_join_fails: false + abort_if_cluster_join_fails: false bind_port: + cluster_label: + cluster_label_verification_disabled: true join_members: - gossip-ring..svc.cluster.local: ... @@ -54,6 +56,8 @@ ingester: > The Memberlist gossip protocol requires a bit of time to propagate the state across the cluster. Setting a 60-second delay ensures that the ingester has enough time to fully sync the existing ring topology from other peers before actively joining and receiving traffic. > > **Note:** Make sure to apply this multi KV store configuration to all other components that interact with the ring (e.g. distributors, store-gateways), not just the ingesters. +> +> **Note:** If multiple Cortex, Mimir, or Loki clusters could reach the same gossip seed addresses, configure a shared `memberlist.cluster_label` for your Cortex cluster. For a fresh Memberlist rollout, you can deploy the shared label with `memberlist.cluster_label_verification_disabled: true` and switch verification back to `false` once every memberlist-enabled Cortex process is using the same label. If you are adding labels to an existing unlabeled Memberlist cluster, first roll out `memberlist.cluster_label_verification_disabled: true` everywhere while leaving `memberlist.cluster_label` empty, then roll out the shared label, and finally switch verification back to `false`. This isolates Memberlist traffic only; it does not isolate Consul or Etcd prefixes. Once deployed, Cortex will begin mirroring primary (Consul) data to Memberlist. @@ -87,4 +91,6 @@ ingester: ``` > **Note:** Again, ensure this update is applied across all components. -After the updated configuration is fully deployed across your cluster and everything is running stably, you can remove your Consul cluster. \ No newline at end of file +After the updated configuration is fully deployed across your cluster and everything is running stably, you can remove your Consul cluster. + +If you enabled `memberlist.cluster_label_verification_disabled: true` during the migration, finish the rollout by setting it back to `false` once every memberlist-enabled Cortex process is using the same `memberlist.cluster_label`. diff --git a/integration/integration_memberlist_single_binary_test.go b/integration/integration_memberlist_single_binary_test.go index 14a720d9a9..f462d820b5 100644 --- a/integration/integration_memberlist_single_binary_test.go +++ b/integration/integration_memberlist_single_binary_test.go @@ -39,6 +39,94 @@ func TestSingleBinaryWithMemberlist(t *testing.T) { }) } +func TestSingleBinaryWithMemberlistClusterLabelIsolation(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + clusterA1 := newSingleBinary("cluster-a-1", "", "", map[string]string{ + "-memberlist.cluster-label": "cluster-a", + "-memberlist.abort-if-join-fails": "false", + }) + clusterA2 := newSingleBinary("cluster-a-2", "", networkName+"-cluster-a-1:8000", map[string]string{ + "-memberlist.cluster-label": "cluster-a", + "-memberlist.abort-if-join-fails": "false", + }) + clusterB1 := newSingleBinary("cluster-b-1", "", networkName+"-cluster-a-1:8000", map[string]string{ + "-memberlist.cluster-label": "cluster-b", + "-memberlist.abort-if-join-fails": "false", + }) + clusterB2 := newSingleBinary("cluster-b-2", "", networkName+"-cluster-b-1:8000", map[string]string{ + "-memberlist.cluster-label": "cluster-b", + "-memberlist.abort-if-join-fails": "false", + }) + + require.NoError(t, s.StartAndWaitReady(clusterA1)) + require.NoError(t, s.StartAndWaitReady(clusterB1)) + require.NoError(t, s.StartAndWaitReady(clusterA2, clusterB2)) + + requireMemberlistClusterState(t, 2, 2*512, clusterA1, clusterA2) + requireMemberlistClusterState(t, 2, 2*512, clusterB1, clusterB2) + + // Verify cross-cluster isolation: clusterB1 must NOT see clusterA members. + // Wait a short observation window to ensure member counts remain stable. + time.Sleep(5 * time.Second) + requireMemberlistClusterState(t, 2, 2*512, clusterA1, clusterA2) + requireMemberlistClusterState(t, 2, 2*512, clusterB1, clusterB2) +} + +func TestSingleBinaryWithMemberlistClusterLabelRollingMigration(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + const clusterLabel = "migration-cluster" + + configs := []struct { + name string + join string + }{ + {name: "migration-cortex-1", join: networkName + "-migration-cortex-2:8000"}, + {name: "migration-cortex-2", join: networkName + "-migration-cortex-1:8000"}, + {name: "migration-cortex-3", join: networkName + "-migration-cortex-1:8000"}, + } + + cortexServices := make([]*e2ecortex.CortexService, 0, len(configs)) + for _, cfg := range configs { + cortexServices = append(cortexServices, newMigrationSingleBinary(cfg.name, cfg.join, "", true)) + } + + require.NoError(t, s.StartAndWaitReady(cortexServices[0])) + require.NoError(t, s.StartAndWaitReady(cortexServices[1], cortexServices[2])) + requireMemberlistClusterState(t, 3, 3*512, cortexServices...) + + for i, cfg := range configs { + replacement := newMigrationSingleBinary(cfg.name, cfg.join, clusterLabel, true) + require.NoError(t, s.Stop(cortexServices[i])) + require.NoError(t, s.StartAndWaitReady(replacement)) + cortexServices[i] = replacement + requireMemberlistClusterState(t, 3, 3*512, cortexServices...) + } + + for i, cfg := range configs { + replacement := newMigrationSingleBinary(cfg.name, cfg.join, clusterLabel, false) + require.NoError(t, s.Stop(cortexServices[i])) + require.NoError(t, s.StartAndWaitReady(replacement)) + cortexServices[i] = replacement + requireMemberlistClusterState(t, 3, 3*512, cortexServices...) + } +} + func testSingleBinaryEnv(t *testing.T, tlsEnabled bool, flags map[string]string) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -162,6 +250,28 @@ func newSingleBinary(name string, servername string, join string, testFlags map[ return serv } +func newMigrationSingleBinary(name string, join string, clusterLabel string, verificationDisabled bool) *e2ecortex.CortexService { + flags := map[string]string{ + "-memberlist.abort-if-join-fails": "false", + "-memberlist.cluster-label-verification-disabled": fmt.Sprintf("%t", verificationDisabled), + } + + if clusterLabel != "" { + flags["-memberlist.cluster-label"] = clusterLabel + } + + return newSingleBinary(name, "", join, flags) +} + +func requireMemberlistClusterState(t *testing.T, expectedMembers, expectedTokens int, services ...*e2ecortex.CortexService) { + t.Helper() + + for _, service := range services { + require.NoError(t, service.WaitSumMetrics(e2e.Equals(float64(expectedMembers)), "memberlist_client_cluster_members_count")) + require.NoError(t, service.WaitSumMetrics(e2e.Equals(float64(expectedTokens)), "cortex_ring_tokens_total")) + } +} + func TestSingleBinaryWithMemberlistScaling(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) diff --git a/pkg/ring/kv/memberlist/memberlist_client.go b/pkg/ring/kv/memberlist/memberlist_client.go index b635a54cee..20be8c1863 100644 --- a/pkg/ring/kv/memberlist/memberlist_client.go +++ b/pkg/ring/kv/memberlist/memberlist_client.go @@ -152,6 +152,9 @@ type KVConfig struct { AdvertiseAddr string `yaml:"advertise_addr"` AdvertisePort int `yaml:"advertise_port"` + ClusterLabel string `yaml:"cluster_label"` + ClusterLabelVerificationDisabled bool `yaml:"cluster_label_verification_disabled"` + // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` MinJoinBackoff time.Duration `yaml:"min_join_backoff"` @@ -209,6 +212,8 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.BoolVar(&cfg.EnableCompression, prefix+"memberlist.compression-enabled", mlDefaults.EnableCompression, "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.") f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") + f.StringVar(&cfg.ClusterLabel, prefix+"memberlist.cluster-label", mlDefaults.Label, "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.") + f.BoolVar(&cfg.ClusterLabelVerificationDisabled, prefix+"memberlist.cluster-label-verification-disabled", mlDefaults.SkipInboundLabelCheck, "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.") cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } @@ -406,6 +411,8 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { mlCfg.AdvertiseAddr = m.cfg.AdvertiseAddr mlCfg.AdvertisePort = m.cfg.AdvertisePort + mlCfg.Label = m.cfg.ClusterLabel + mlCfg.SkipInboundLabelCheck = m.cfg.ClusterLabelVerificationDisabled if m.cfg.NodeName != "" { mlCfg.Name = m.cfg.NodeName @@ -415,6 +422,10 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { level.Info(m.logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name) } + if mlCfg.Label != "" { + level.Info(m.logger).Log("msg", "Using memberlist cluster label", "cluster_label", mlCfg.Label, "skip_inbound_label_check", mlCfg.SkipInboundLabelCheck) + } + mlCfg.LogOutput = newMemberlistLoggerAdapter(m.logger, false) mlCfg.Transport = tr diff --git a/pkg/ring/kv/memberlist/memberlist_client_test.go b/pkg/ring/kv/memberlist/memberlist_client_test.go index 002ccb340f..70e575e7bf 100644 --- a/pkg/ring/kv/memberlist/memberlist_client_test.go +++ b/pkg/ring/kv/memberlist/memberlist_client_test.go @@ -530,33 +530,156 @@ func TestMultipleCAS(t *testing.T) { } func TestMultipleClients(t *testing.T) { - c := dataCodec{} + t.Parallel() + + err := testMultipleClientsWithConfigGenerator(t, 10, defaultMultipleClientsKVConfig) + require.NoError(t, err) +} + +func TestMultipleClientsWithMixedClusterLabelsAndExpectFailure(t *testing.T) { + t.Parallel() + + memberLabels := []string{"", "label1", "label2", "label3", "label4"} + + configGen := func(memberID int) KVConfig { + cfg := defaultMultipleClientsKVConfig(memberID) + cfg.ClusterLabel = memberLabels[memberID] + return cfg + } + + err := testMultipleClientsWithConfigGenerator(t, len(memberLabels), configGen) + require.Error(t, err) + require.Contains(t, err.Error(), "expected to see at least 2 members, got 1") +} + +func TestMultipleClientsWithMixedClusterLabelsAndVerificationDisabled(t *testing.T) { + t.Parallel() + + memberLabels := []string{"", "label1", "label2"} + + configGen := func(memberID int) KVConfig { + cfg := defaultMultipleClientsKVConfig(memberID) + cfg.ClusterLabel = memberLabels[memberID] + cfg.ClusterLabelVerificationDisabled = true + return cfg + } + + err := testMultipleClientsWithConfigGenerator(t, len(memberLabels), configGen) + require.NoError(t, err) +} + +func TestMultipleClientsWithSameClusterLabel(t *testing.T) { + t.Parallel() const members = 10 - const key = "ring" + const clusterLabel = "test-cluster" - var clients []*Client + configGen := func(memberID int) KVConfig { + cfg := defaultMultipleClientsKVConfig(memberID) + cfg.ClusterLabel = clusterLabel + return cfg + } - stop := make(chan struct{}) - start := make(chan struct{}) + err := testMultipleClientsWithConfigGenerator(t, members, configGen) + require.NoError(t, err) +} + +func TestBuildMemberlistConfigClusterLabelOptions(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + clusterLabel string + clusterLabelVerificationDisabled bool + }{ + { + name: "empty label keeps verification enabled by default", + }, + { + name: "configured label can disable verification", + clusterLabel: "cluster-a", + clusterLabelVerificationDisabled: true, + }, + { + name: "configured label with verification enabled", + clusterLabel: "cluster-a", + clusterLabelVerificationDisabled: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, + } + cfg.ClusterLabel = tc.clusterLabel + cfg.ClusterLabelVerificationDisabled = tc.clusterLabelVerificationDisabled + + kv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) + + mlCfg, err := kv.buildMemberlistConfig() + require.NoError(t, err) + require.Equal(t, tc.clusterLabel, mlCfg.Label) + require.Equal(t, tc.clusterLabelVerificationDisabled, mlCfg.SkipInboundLabelCheck) + + transport, ok := mlCfg.Transport.(*TCPTransport) + require.True(t, ok) + require.NoError(t, transport.Shutdown()) + }) + } +} + +func defaultMultipleClientsKVConfig(memberID int) KVConfig { + var cfg KVConfig + flagext.DefaultValues(&cfg) + cfg.NodeName = fmt.Sprintf("Member-%d", memberID) + cfg.GossipInterval = 100 * time.Millisecond + cfg.GossipNodes = 3 + cfg.PushPullInterval = 5 * time.Second + cfg.TCPTransport = TCPTransportConfig{ + BindAddrs: []string{"localhost"}, + BindPort: 0, + } + + return cfg +} + +func testMultipleClientsWithConfigGenerator(t *testing.T, members int, configGen func(memberID int) KVConfig) error { + t.Helper() + + c := dataCodec{} + const key = "ring" + + clients := make([]*Client, 0, members) port := 0 + casInterval := time.Second - for i := range members { - id := fmt.Sprintf("Member-%d", i) - var cfg KVConfig - flagext.DefaultValues(&cfg) - cfg.NodeName = id + start := make(chan struct{}) + stop := make(chan struct{}) - cfg.GossipInterval = 100 * time.Millisecond - cfg.GossipNodes = 3 - cfg.PushPullInterval = 5 * time.Second + var clientWg sync.WaitGroup - cfg.TCPTransport = TCPTransportConfig{ - BindAddrs: []string{"localhost"}, - BindPort: 0, // randomize ports + clientErrCh := make(chan error, members) + getClientErr := func() error { + select { + case err := <-clientErrCh: + return err + default: + return nil } + } + + defer func() { + close(stop) + clientWg.Wait() + }() + for i := range members { + cfg := configGen(i) cfg.Codecs = []codec.Codec{c} mkv := NewKV(cfg, log.NewNopLogger(), &dnsProviderMock{}, prometheus.NewPedanticRegistry()) @@ -564,29 +687,36 @@ func TestMultipleClients(t *testing.T) { kv, err := NewClient(mkv, c) require.NoError(t, err) - clients = append(clients, kv) - go runClient(t, kv, id, key, port, start, stop) + clientWg.Add(1) + go func(kv *Client, nodeName string, portToConnect int) { + defer clientWg.Done() + + if err := runClientWithErr(kv, nodeName, key, portToConnect, casInterval, start, stop); err != nil { + clientErrCh <- err + } + }(kv, cfg.NodeName, port) // next KV will connect to this one port = kv.kv.GetListeningPort() } - println("Waiting before start") + t.Log("Waiting before start") time.Sleep(2 * time.Second) close(start) - println("Observing ring ...") + t.Log("Observing ring ...") startTime := time.Now() - firstKv := clients[0] - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - updates := 0 - firstKv.WatchKey(ctx, key, func(in any) bool { - updates++ + firstKV := clients[0] + ctx, cancel := context.WithTimeout(context.Background(), casInterval*3) + defer cancel() + joinedMembers := 0 + firstKV.WatchKey(ctx, key, func(in any) bool { r := in.(*data) + joinedMembers = len(r.Members) minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members) @@ -595,64 +725,81 @@ func TestMultipleClients(t *testing.T) { "tokens, oldest timestamp:", now.Sub(time.Unix(minTimestamp, 0)).String(), "avg timestamp:", now.Sub(time.Unix(avgTimestamp, 0)).String(), "youngest timestamp:", now.Sub(time.Unix(maxTimestamp, 0)).String()) - return true // yes, keep watching + return true }) - cancel() // make linter happy - t.Logf("Ring updates observed: %d", updates) + if joinedMembers <= 1 { + return fmt.Errorf("expected to see at least 2 members, got %d", joinedMembers) + } - if updates < members { - // in general, at least one update from each node. (although that's not necessarily true... - // but typically we get more updates than that anyway) - t.Errorf("expected to see updates, got %d", updates) + if err := getClientErr(); err != nil { + return err } - // Let's check all the clients to see if they have relatively up-to-date information - // All of them should at least have all the clients - // And same tokens. - allTokens := []uint32(nil) + check := func() error { + allTokens := []uint32(nil) - for i := range members { - kv := clients[i] + for i, kv := range clients { + r := getData(t, kv, key) + t.Logf("KV %d: number of known members: %d", i, len(r.Members)) + if len(r.Members) != members { + return fmt.Errorf("member %d has only %d members in the ring", i, len(r.Members)) + } - r := getData(t, kv, key) - t.Logf("KV %d: number of known members: %d\n", i, len(r.Members)) - if len(r.Members) != members { - t.Errorf("Member %d has only %d members in the ring", i, len(r.Members)) - } + minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members) + for n, ing := range r.Members { + if ing.State != ACTIVE { + stateStr := "UNKNOWN" + switch ing.State { + case JOINING: + stateStr = "JOINING" + case LEFT: + stateStr = "LEFT" + } + return fmt.Errorf("member %d: invalid state of member %s in the ring: %s (%v)", i, n, stateStr, ing.State) + } + } - minTimestamp, maxTimestamp, avgTimestamp := getTimestamps(r.Members) - for n, ing := range r.Members { - if ing.State != ACTIVE { - t.Errorf("Member %d: invalid state of member %s in the ring: %v ", i, n, ing.State) + now := time.Now() + t.Logf("Member %d: oldest: %v, avg: %v, youngest: %v", i, + now.Sub(time.Unix(minTimestamp, 0)).String(), + now.Sub(time.Unix(avgTimestamp, 0)).String(), + now.Sub(time.Unix(maxTimestamp, 0)).String()) + + tokens := r.getAllTokens() + if allTokens == nil { + allTokens = tokens + t.Logf("Found tokens: %d", len(allTokens)) + continue } - } - now := time.Now() - t.Logf("Member %d: oldest: %v, avg: %v, youngest: %v", i, - now.Sub(time.Unix(minTimestamp, 0)).String(), - now.Sub(time.Unix(avgTimestamp, 0)).String(), - now.Sub(time.Unix(maxTimestamp, 0)).String()) - - tokens := r.getAllTokens() - if allTokens == nil { - allTokens = tokens - t.Logf("Found tokens: %d", len(allTokens)) - } else { + if len(allTokens) != len(tokens) { - t.Errorf("Member %d: Expected %d tokens, got %d", i, len(allTokens), len(tokens)) - } else { - for ix, tok := range allTokens { - if tok != tokens[ix] { - t.Errorf("Member %d: Tokens at position %d differ: %v, %v", i, ix, tok, tokens[ix]) - break - } + return fmt.Errorf("member %d: expected %d tokens, got %d", i, len(allTokens), len(tokens)) + } + + for ix, tok := range allTokens { + if tok != tokens[ix] { + return fmt.Errorf("member %d: tokens at position %d differ: %v, %v", i, ix, tok, tokens[ix]) } } } + + return getClientErr() } - // We cannot shutdown the KV until now in order for Get() to work reliably. - close(stop) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + return check() + case <-ticker.C: + if err := check(); err == nil { + return nil + } + } + } } func TestJoinMembersWithRetryBackoff(t *testing.T) { @@ -871,6 +1018,14 @@ func getTimestamps(members map[string]member) (min int64, max int64, avg int64) } func runClient(t *testing.T, kv *Client, name string, ringKey string, portToConnect int, start <-chan struct{}, stop <-chan struct{}) { + t.Helper() + + if err := runClientWithErr(kv, name, ringKey, portToConnect, time.Second, start, stop); err != nil { + t.Errorf("%v", err) + } +} + +func runClientWithErr(kv *Client, name string, ringKey string, portToConnect int, casInterval time.Duration, start <-chan struct{}, stop <-chan struct{}) error { // stop gossipping about the ring(s) defer services.StopAndAwaitTerminated(context.Background(), kv.kv) //nolint:errcheck @@ -883,14 +1038,28 @@ func runClient(t *testing.T, kv *Client, name string, ringKey string, portToConn if portToConnect > 0 { _, err := kv.kv.JoinMembers([]string{fmt.Sprintf("127.0.0.1:%d", portToConnect)}) if err != nil { - t.Errorf("%s failed to join the cluster: %v", name, err) - return + return fmt.Errorf("%s failed to join the cluster: %w", name, err) } } case <-stop: - return - case <-time.After(1 * time.Second): - cas(t, kv, ringKey, updateFn(name)) + return nil + case <-time.After(casInterval): + err := kv.CAS(context.Background(), ringKey, func(in any) (out any, retry bool, err error) { + var d *data + if in != nil { + d = in.(*data) + } + + updated, retry, err := updateFn(name)(d) + if updated == nil { + return nil, retry, err + } + + return updated, retry, err + }) + if err != nil { + return fmt.Errorf("failed to CAS the ring: %w", err) + } } } } diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 20cf970c35..fc2ea8ae35 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5600,6 +5600,17 @@ "type": "number", "x-cli-flag": "memberlist.bind-port" }, + "cluster_label": { + "description": "The cluster label is an optional string to include in outbound packets and gossip streams. Other members in the memberlist cluster will discard any message whose label doesn't match the configured one, unless the 'cluster-label-verification-disabled' configuration option is set to true.", + "type": "string", + "x-cli-flag": "memberlist.cluster-label" + }, + "cluster_label_verification_disabled": { + "default": false, + "description": "When true, memberlist doesn't verify that inbound packets and gossip streams have the cluster label matching the configured one. This verification should be disabled while rolling out the change to the configured cluster label in a live memberlist cluster.", + "type": "boolean", + "x-cli-flag": "memberlist.cluster-label-verification-disabled" + }, "compression_enabled": { "default": true, "description": "Enable message compression. This can be used to reduce bandwidth usage at the cost of slightly more CPU utilization.",