From a1e6d694540f0d021a37d8755a3bdbb5cb319a25 Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Fri, 22 Jul 2022 15:20:21 -0700 Subject: [PATCH] peering: add config to enable/disable peering (#13867) * peering: add config to enable/disable peering Add config: ``` peering { enabled = true } ``` Defaults to true. When disabled: 1. All peering RPC endpoints will return an error 2. Leader won't start its peering establishment goroutines 3. Leader won't start its peering deletion goroutines --- agent/agent.go | 2 + agent/config/builder.go | 1 + agent/config/config.go | 5 ++ agent/config/default.go | 3 + agent/config/runtime.go | 8 ++ agent/config/runtime_test.go | 11 +++ .../TestRuntimeConfig_Sanitize.golden | 1 + agent/config/testdata/full-config.hcl | 3 + agent/config/testdata/full-config.json | 3 + agent/consul/config.go | 5 ++ agent/consul/leader.go | 8 +- agent/consul/leader_peering_test.go | 86 +++++++++++++++++++ agent/consul/server.go | 1 + agent/rpc/peering/service.go | 35 ++++++++ agent/rpc/peering/service_test.go | 63 ++++++++++++++ docs/config/checklist-adding-config-fields.md | 2 + .../docs/agent/config/config-files.mdx | 9 ++ 17 files changed, 244 insertions(+), 2 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index a7c89a7270..197434e77c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1341,6 +1341,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co // function does not drift. cfg.SerfLANConfig = consul.CloneSerfLANConfig(cfg.SerfLANConfig) + cfg.PeeringEnabled = runtimeCfg.PeeringEnabled + enterpriseConsulConfig(cfg, runtimeCfg) return cfg, nil } diff --git a/agent/config/builder.go b/agent/config/builder.go index f855aae516..70c5d044cb 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1014,6 +1014,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { NodeMeta: c.NodeMeta, NodeName: b.nodeName(c.NodeName), ReadReplica: boolVal(c.ReadReplica), + PeeringEnabled: boolVal(c.Peering.Enabled), PidFile: stringVal(c.PidFile), PrimaryDatacenter: primaryDatacenter, PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways), diff --git a/agent/config/config.go b/agent/config/config.go index c4f752a823..23e7550aa2 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -197,6 +197,7 @@ type Config struct { NodeID *string `mapstructure:"node_id"` NodeMeta map[string]string `mapstructure:"node_meta"` NodeName *string `mapstructure:"node_name"` + Peering Peering `mapstructure:"peering"` Performance Performance `mapstructure:"performance"` PidFile *string `mapstructure:"pid_file"` Ports Ports `mapstructure:"ports"` @@ -887,3 +888,7 @@ type TLS struct { // config merging logic. GRPCModifiedByDeprecatedConfig *struct{} `mapstructure:"-"` } + +type Peering struct { + Enabled *bool `mapstructure:"enabled"` +} diff --git a/agent/config/default.go b/agent/config/default.go index 951d9f1263..d0cc2865dc 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -104,6 +104,9 @@ func DefaultSource() Source { kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + ` txn_max_req_len = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + ` } + peering = { + enabled = true + } performance = { leave_drain_time = "5s" raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + ` diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 2ae9888ae0..db46c21849 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -810,6 +810,14 @@ type RuntimeConfig struct { // flag: -non-voting-server ReadReplica bool + // PeeringEnabled enables cluster peering. This setting only applies for servers. + // When disabled, all peering RPC endpoints will return errors, + // peering requests from other clusters will receive errors, and any peerings already stored in this server's + // state will be ignored. + // + // hcl: peering { enabled = (true|false) } + PeeringEnabled bool + // PidFile is the file to store our PID in. // // hcl: pid_file = string diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 0963ec07f7..b05b314919 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -5548,6 +5548,16 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { "tls.grpc was provided but TLS will NOT be enabled on the gRPC listener without an HTTPS listener configured (e.g. via ports.https)", }, }) + run(t, testCase{ + desc: "peering.enabled defaults to true", + args: []string{ + `-data-dir=` + dataDir, + }, + expected: func(rt *RuntimeConfig) { + rt.DataDir = dataDir + rt.PeeringEnabled = true + }, + }) } func (tc testCase) run(format string, dataDir string) func(t *testing.T) { @@ -5955,6 +5965,7 @@ func TestLoad_FullConfig(t *testing.T) { NodeMeta: map[string]string{"5mgGQMBk": "mJLtVMSG", "A7ynFMJB": "0Nx6RGab"}, NodeName: "otlLxGaI", ReadReplica: true, + PeeringEnabled: true, PidFile: "43xN80Km", PrimaryGateways: []string{"aej8eeZo", "roh2KahS"}, PrimaryGatewaysInterval: 18866 * time.Second, diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 25fbba0c0e..b5d72f864e 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -235,6 +235,7 @@ "NodeID": "", "NodeMeta": {}, "NodeName": "", + "PeeringEnabled": false, "PidFile": "", "PrimaryDatacenter": "", "PrimaryGateways": [ diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index bb544b54a5..ed8203296c 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -305,6 +305,9 @@ node_meta { node_name = "otlLxGaI" non_voting_server = true partition = "" +peering { + enabled = true +} performance { leave_drain_time = "8265s" raft_multiplier = 5 diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index 36f52e6814..8294a27b7c 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -305,6 +305,9 @@ "node_name": "otlLxGaI", "non_voting_server": true, "partition": "", + "peering": { + "enabled": true + }, "performance": { "leave_drain_time": "8265s", "raft_multiplier": 5, diff --git a/agent/consul/config.go b/agent/consul/config.go index 50235c6814..469ccc9191 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -396,6 +396,9 @@ type Config struct { RaftBoltDBConfig RaftBoltDBConfig + // PeeringEnabled enables cluster peering. + PeeringEnabled bool + // Embedded Consul Enterprise specific configuration *EnterpriseConfig } @@ -512,6 +515,8 @@ func DefaultConfig() *Config { DefaultQueryTime: 300 * time.Second, MaxQueryTime: 600 * time.Second, + PeeringEnabled: true, + EnterpriseConfig: DefaultEnterpriseConfig(), } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index eb197deb3e..389b790569 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -315,7 +315,9 @@ func (s *Server) establishLeadership(ctx context.Context) error { s.startFederationStateAntiEntropy(ctx) - s.startPeeringStreamSync(ctx) + if s.config.PeeringEnabled { + s.startPeeringStreamSync(ctx) + } s.startDeferredDeletion(ctx) @@ -758,7 +760,9 @@ func (s *Server) stopACLReplication() { } func (s *Server) startDeferredDeletion(ctx context.Context) { - s.startPeeringDeferredDeletion(ctx) + if s.config.PeeringEnabled { + s.startPeeringDeferredDeletion(ctx) + } s.startTenancyDeferredDeletion(ctx) } diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 06cbda43d9..feaf5be027 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -1036,3 +1036,89 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) { require.Equal(r, float32(2), metric2.Value) // for d, e services }) } + +// Test that the leader doesn't start its peering deletion routing when +// peering is disabled. +func TestLeader_Peering_NoDeletionWhenPeeringDisabled(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.PeeringEnabled = false + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + var ( + peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + peerName = "my-peer-s2" + lastIdx = uint64(0) + ) + + // Simulate a peering initiation event by writing a peering to the state store. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, + Name: peerName, + })) + + // Mark the peering for deletion to trigger the termination sequence. + lastIdx++ + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, + Name: peerName, + DeletedAt: structs.TimeToProto(time.Now()), + })) + + // The leader routine shouldn't be running so the peering should never get deleted. + require.Never(t, func() bool { + _, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{ + Value: peerName, + }) + if err != nil { + t.Logf("unexpected err: %s", err) + return true + } + if peering == nil { + return true + } + return false + }, 7*time.Second, 1*time.Second, "peering should not have been deleted") +} + +// Test that the leader doesn't start its peering establishment routine +// when peering is disabled. +func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + c.PeeringEnabled = false + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + var ( + peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + peerName = "my-peer-s2" + lastIdx = uint64(0) + ) + + // Simulate a peering initiation event by writing a peering to the state store. + require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, + Name: peerName, + PeerServerAddresses: []string{"1.2.3.4"}, + })) + + require.Never(t, func() bool { + _, found := s1.peerStreamTracker.StreamStatus(peerID) + return found + }, 7*time.Second, 1*time.Second, "peering should not have been established") +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 3c240c5f7d..a14253d803 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -794,6 +794,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler }, Datacenter: config.Datacenter, ConnectEnabled: config.ConnectEnabled, + PeeringEnabled: config.PeeringEnabled, }) s.peeringServer = p diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 1d0d219f6c..a8ca3b199e 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -56,6 +56,7 @@ type Config struct { ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) Datacenter string ConnectEnabled bool + PeeringEnabled bool } func NewServer(cfg Config) *Server { @@ -139,6 +140,8 @@ type Store interface { TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error) } +var peeringNotEnabledErr = grpcstatus.Error(codes.FailedPrecondition, "peering must be enabled to use this endpoint") + // GenerateToken implements the PeeringService RPC method to generate a // peering token which is the initial step in establishing a peering relationship // with other Consul clusters. @@ -146,6 +149,10 @@ func (s *Server) GenerateToken( ctx context.Context, req *pbpeering.GenerateTokenRequest, ) (*pbpeering.GenerateTokenResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -251,6 +258,10 @@ func (s *Server) Establish( ctx context.Context, req *pbpeering.EstablishRequest, ) (*pbpeering.EstablishResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + // validate prior to forwarding to the leader, this saves a network hop if err := dns.ValidateLabel(req.PeerName); err != nil { return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err) @@ -316,6 +327,10 @@ func (s *Server) Establish( } func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -350,6 +365,10 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ } func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -413,6 +432,10 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering // TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store. // Consider removing if we can find another way to populate state store in peering_endpoint_test.go func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Peering.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -449,6 +472,10 @@ func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRe } func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -505,6 +532,10 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete } func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } @@ -540,6 +571,10 @@ func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundle // TODO(peering): rename rpc & request/response to drop the "service" part func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) { + if !s.Config.PeeringEnabled { + return nil, peeringNotEnabledErr + } + if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil { return nil, grpcstatus.Error(codes.InvalidArgument, err.Error()) } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 939a304d2a..c7e37c91d8 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" gogrpc "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul" @@ -529,6 +531,67 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) { require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs) } +// Test RPC endpoint responses when peering is disabled. They should all return an error. +func TestPeeringService_PeeringDisabled(t *testing.T) { + // TODO(peering): see note on newTestServer, refactor to not use this + s := newTestServer(t, func(c *consul.Config) { c.PeeringEnabled = false }) + client := pbpeering.NewPeeringServiceClient(s.ClientConn(t)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + t.Cleanup(cancel) + + // assertFailedResponse is a helper function that checks the error from a gRPC + // response is what we expect when peering is disabled. + assertFailedResponse := func(t *testing.T, err error) { + actErr, ok := grpcstatus.FromError(err) + require.True(t, ok) + require.Equal(t, codes.FailedPrecondition, actErr.Code()) + require.Equal(t, "peering must be enabled to use this endpoint", actErr.Message()) + } + + // Test all the endpoints. + + t.Run("PeeringWrite", func(t *testing.T) { + _, err := client.PeeringWrite(ctx, &pbpeering.PeeringWriteRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("PeeringRead", func(t *testing.T) { + _, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("PeeringDelete", func(t *testing.T) { + _, err := client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("PeeringList", func(t *testing.T) { + _, err := client.PeeringList(ctx, &pbpeering.PeeringListRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("Establish", func(t *testing.T) { + _, err := client.Establish(ctx, &pbpeering.EstablishRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("GenerateToken", func(t *testing.T) { + _, err := client.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("TrustBundleRead", func(t *testing.T) { + _, err := client.TrustBundleRead(ctx, &pbpeering.TrustBundleReadRequest{}) + assertFailedResponse(t, err) + }) + + t.Run("TrustBundleListByService", func(t *testing.T) { + _, err := client.TrustBundleListByService(ctx, &pbpeering.TrustBundleListByServiceRequest{}) + assertFailedResponse(t, err) + }) +} + // newTestServer is copied from partition/service_test.go, with the addition of certs/cas. // TODO(peering): these are endpoint tests and should live in the agent/consul // package. Instead, these can be written around a mock client (see testing.go) diff --git a/docs/config/checklist-adding-config-fields.md b/docs/config/checklist-adding-config-fields.md index e171394115..7a47eb8415 100644 --- a/docs/config/checklist-adding-config-fields.md +++ b/docs/config/checklist-adding-config-fields.md @@ -45,6 +45,8 @@ There are four specific cases covered with increasing complexity: - [ ] Add that to `DefaultSource` in `agent/config/defaults.go`. - [ ] Add a test case to the table test `TestLoad_IntegrationWithFlags` in `agent/config/runtime_test.go`. + - [ ] If the config needs to be defaulted for the test server used in unit tests, + also add it to `DefaultConfig()` in `agent/consul/defaults.go`. - [ ] **If** your config should take effect on a reload/HUP. - [ ] Add necessary code to to trigger a safe (locked or atomic) update to any state the feature needs changing. This needs to be added to one or diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index a8eaba6d5a..6b902987ee 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -551,6 +551,15 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `max_query_time` Equivalent to the [`-max-query-time` command-line flag](/docs/agent/config/cli-flags#_max_query_time). +- `peering` This object allows setting options for cluster peering. + + The following sub-keys are available: + + - `enabled` ((#peering_enabled)) (Defaults to `true`) Controls whether cluster peering is enabled. + Has no effect on Consul clients, only on Consul servers. When disabled, all peering APIs will return + an error, any peerings stored in Consul already will be ignored (but they will not be deleted), + all peering connections from other clusters will be rejected. This was added in Consul 1.13.0. + - `partition` - This flag is used to set the name of the admin partition the agent belongs to. An agent can only join and communicate with other agents within its admin partition. Review the