peering: add config to enable/disable peering (#13867)

* peering: add config to enable/disable peering

Add config:

```
peering {
  enabled = true
}
```

Defaults to true. When disabled:
1. All peering RPC endpoints will return an error
2. Leader won't start its peering establishment goroutines
3. Leader won't start its peering deletion goroutines
This commit is contained in:
Luke Kysow 2022-07-22 15:20:21 -07:00 committed by GitHub
parent 0786517b56
commit a1e6d69454
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 244 additions and 2 deletions

View File

@ -1341,6 +1341,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
// function does not drift.
cfg.SerfLANConfig = consul.CloneSerfLANConfig(cfg.SerfLANConfig)
cfg.PeeringEnabled = runtimeCfg.PeeringEnabled
enterpriseConsulConfig(cfg, runtimeCfg)
return cfg, nil
}

View File

@ -1014,6 +1014,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
NodeMeta: c.NodeMeta,
NodeName: b.nodeName(c.NodeName),
ReadReplica: boolVal(c.ReadReplica),
PeeringEnabled: boolVal(c.Peering.Enabled),
PidFile: stringVal(c.PidFile),
PrimaryDatacenter: primaryDatacenter,
PrimaryGateways: b.expandAllOptionalAddrs("primary_gateways", c.PrimaryGateways),

View File

@ -197,6 +197,7 @@ type Config struct {
NodeID *string `mapstructure:"node_id"`
NodeMeta map[string]string `mapstructure:"node_meta"`
NodeName *string `mapstructure:"node_name"`
Peering Peering `mapstructure:"peering"`
Performance Performance `mapstructure:"performance"`
PidFile *string `mapstructure:"pid_file"`
Ports Ports `mapstructure:"ports"`
@ -887,3 +888,7 @@ type TLS struct {
// config merging logic.
GRPCModifiedByDeprecatedConfig *struct{} `mapstructure:"-"`
}
type Peering struct {
Enabled *bool `mapstructure:"enabled"`
}

View File

@ -104,6 +104,9 @@ func DefaultSource() Source {
kv_max_value_size = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
txn_max_req_len = ` + strconv.FormatInt(raft.SuggestedMaxDataSize, 10) + `
}
peering = {
enabled = true
}
performance = {
leave_drain_time = "5s"
raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + `

View File

@ -810,6 +810,14 @@ type RuntimeConfig struct {
// flag: -non-voting-server
ReadReplica bool
// PeeringEnabled enables cluster peering. This setting only applies for servers.
// When disabled, all peering RPC endpoints will return errors,
// peering requests from other clusters will receive errors, and any peerings already stored in this server's
// state will be ignored.
//
// hcl: peering { enabled = (true|false) }
PeeringEnabled bool
// PidFile is the file to store our PID in.
//
// hcl: pid_file = string

View File

@ -5548,6 +5548,16 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
"tls.grpc was provided but TLS will NOT be enabled on the gRPC listener without an HTTPS listener configured (e.g. via ports.https)",
},
})
run(t, testCase{
desc: "peering.enabled defaults to true",
args: []string{
`-data-dir=` + dataDir,
},
expected: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
rt.PeeringEnabled = true
},
})
}
func (tc testCase) run(format string, dataDir string) func(t *testing.T) {
@ -5955,6 +5965,7 @@ func TestLoad_FullConfig(t *testing.T) {
NodeMeta: map[string]string{"5mgGQMBk": "mJLtVMSG", "A7ynFMJB": "0Nx6RGab"},
NodeName: "otlLxGaI",
ReadReplica: true,
PeeringEnabled: true,
PidFile: "43xN80Km",
PrimaryGateways: []string{"aej8eeZo", "roh2KahS"},
PrimaryGatewaysInterval: 18866 * time.Second,

View File

@ -235,6 +235,7 @@
"NodeID": "",
"NodeMeta": {},
"NodeName": "",
"PeeringEnabled": false,
"PidFile": "",
"PrimaryDatacenter": "",
"PrimaryGateways": [

View File

@ -305,6 +305,9 @@ node_meta {
node_name = "otlLxGaI"
non_voting_server = true
partition = ""
peering {
enabled = true
}
performance {
leave_drain_time = "8265s"
raft_multiplier = 5

View File

@ -305,6 +305,9 @@
"node_name": "otlLxGaI",
"non_voting_server": true,
"partition": "",
"peering": {
"enabled": true
},
"performance": {
"leave_drain_time": "8265s",
"raft_multiplier": 5,

View File

@ -396,6 +396,9 @@ type Config struct {
RaftBoltDBConfig RaftBoltDBConfig
// PeeringEnabled enables cluster peering.
PeeringEnabled bool
// Embedded Consul Enterprise specific configuration
*EnterpriseConfig
}
@ -512,6 +515,8 @@ func DefaultConfig() *Config {
DefaultQueryTime: 300 * time.Second,
MaxQueryTime: 600 * time.Second,
PeeringEnabled: true,
EnterpriseConfig: DefaultEnterpriseConfig(),
}

View File

@ -315,7 +315,9 @@ func (s *Server) establishLeadership(ctx context.Context) error {
s.startFederationStateAntiEntropy(ctx)
s.startPeeringStreamSync(ctx)
if s.config.PeeringEnabled {
s.startPeeringStreamSync(ctx)
}
s.startDeferredDeletion(ctx)
@ -758,7 +760,9 @@ func (s *Server) stopACLReplication() {
}
func (s *Server) startDeferredDeletion(ctx context.Context) {
s.startPeeringDeferredDeletion(ctx)
if s.config.PeeringEnabled {
s.startPeeringDeferredDeletion(ctx)
}
s.startTenancyDeferredDeletion(ctx)
}

View File

@ -1036,3 +1036,89 @@ func TestLeader_PeeringMetrics_emitPeeringMetrics(t *testing.T) {
require.Equal(r, float32(2), metric2.Value) // for d, e services
})
}
// Test that the leader doesn't start its peering deletion routing when
// peering is disabled.
func TestLeader_Peering_NoDeletionWhenPeeringDisabled(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s1.dc1"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.PeeringEnabled = false
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
var (
peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
peerName = "my-peer-s2"
lastIdx = uint64(0)
)
// Simulate a peering initiation event by writing a peering to the state store.
lastIdx++
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
ID: peerID,
Name: peerName,
}))
// Mark the peering for deletion to trigger the termination sequence.
lastIdx++
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
ID: peerID,
Name: peerName,
DeletedAt: structs.TimeToProto(time.Now()),
}))
// The leader routine shouldn't be running so the peering should never get deleted.
require.Never(t, func() bool {
_, peering, err := s1.fsm.State().PeeringRead(nil, state.Query{
Value: peerName,
})
if err != nil {
t.Logf("unexpected err: %s", err)
return true
}
if peering == nil {
return true
}
return false
}, 7*time.Second, 1*time.Second, "peering should not have been deleted")
}
// Test that the leader doesn't start its peering establishment routine
// when peering is disabled.
func TestLeader_Peering_NoEstablishmentWhenPeeringDisabled(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t, func(c *Config) {
c.NodeName = "s1.dc1"
c.Datacenter = "dc1"
c.TLSConfig.Domain = "consul"
c.PeeringEnabled = false
})
testrpc.WaitForLeader(t, s1.RPC, "dc1")
var (
peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
peerName = "my-peer-s2"
lastIdx = uint64(0)
)
// Simulate a peering initiation event by writing a peering to the state store.
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
ID: peerID,
Name: peerName,
PeerServerAddresses: []string{"1.2.3.4"},
}))
require.Never(t, func() bool {
_, found := s1.peerStreamTracker.StreamStatus(peerID)
return found
}, 7*time.Second, 1*time.Second, "peering should not have been established")
}

View File

@ -794,6 +794,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
},
Datacenter: config.Datacenter,
ConnectEnabled: config.ConnectEnabled,
PeeringEnabled: config.PeeringEnabled,
})
s.peeringServer = p

View File

@ -56,6 +56,7 @@ type Config struct {
ForwardRPC func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
Datacenter string
ConnectEnabled bool
PeeringEnabled bool
}
func NewServer(cfg Config) *Server {
@ -139,6 +140,8 @@ type Store interface {
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
}
var peeringNotEnabledErr = grpcstatus.Error(codes.FailedPrecondition, "peering must be enabled to use this endpoint")
// GenerateToken implements the PeeringService RPC method to generate a
// peering token which is the initial step in establishing a peering relationship
// with other Consul clusters.
@ -146,6 +149,10 @@ func (s *Server) GenerateToken(
ctx context.Context,
req *pbpeering.GenerateTokenRequest,
) (*pbpeering.GenerateTokenResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
@ -251,6 +258,10 @@ func (s *Server) Establish(
ctx context.Context,
req *pbpeering.EstablishRequest,
) (*pbpeering.EstablishResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
// validate prior to forwarding to the leader, this saves a network hop
if err := dns.ValidateLabel(req.PeerName); err != nil {
return nil, fmt.Errorf("%s is not a valid peer name: %w", req.PeerName, err)
@ -316,6 +327,10 @@ func (s *Server) Establish(
}
func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequest) (*pbpeering.PeeringReadResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
@ -350,6 +365,10 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ
}
func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
@ -413,6 +432,10 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
// Consider removing if we can find another way to populate state store in peering_endpoint_test.go
func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRequest) (*pbpeering.PeeringWriteResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
if err := s.Backend.EnterpriseCheckPartitions(req.Peering.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
@ -449,6 +472,10 @@ func (s *Server) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteRe
}
func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDeleteRequest) (*pbpeering.PeeringDeleteResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
@ -505,6 +532,10 @@ func (s *Server) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelete
}
func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundleReadRequest) (*pbpeering.TrustBundleReadResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}
@ -540,6 +571,10 @@ func (s *Server) TrustBundleRead(ctx context.Context, req *pbpeering.TrustBundle
// TODO(peering): rename rpc & request/response to drop the "service" part
func (s *Server) TrustBundleListByService(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest) (*pbpeering.TrustBundleListByServiceResponse, error) {
if !s.Config.PeeringEnabled {
return nil, peeringNotEnabledErr
}
if err := s.Backend.EnterpriseCheckPartitions(req.Partition); err != nil {
return nil, grpcstatus.Error(codes.InvalidArgument, err.Error())
}

View File

@ -15,6 +15,8 @@ import (
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
gogrpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul"
@ -529,6 +531,67 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
require.Equal(t, []string{"foo-root-1"}, resp.Bundles[1].RootPEMs)
}
// Test RPC endpoint responses when peering is disabled. They should all return an error.
func TestPeeringService_PeeringDisabled(t *testing.T) {
// TODO(peering): see note on newTestServer, refactor to not use this
s := newTestServer(t, func(c *consul.Config) { c.PeeringEnabled = false })
client := pbpeering.NewPeeringServiceClient(s.ClientConn(t))
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
// assertFailedResponse is a helper function that checks the error from a gRPC
// response is what we expect when peering is disabled.
assertFailedResponse := func(t *testing.T, err error) {
actErr, ok := grpcstatus.FromError(err)
require.True(t, ok)
require.Equal(t, codes.FailedPrecondition, actErr.Code())
require.Equal(t, "peering must be enabled to use this endpoint", actErr.Message())
}
// Test all the endpoints.
t.Run("PeeringWrite", func(t *testing.T) {
_, err := client.PeeringWrite(ctx, &pbpeering.PeeringWriteRequest{})
assertFailedResponse(t, err)
})
t.Run("PeeringRead", func(t *testing.T) {
_, err := client.PeeringRead(ctx, &pbpeering.PeeringReadRequest{})
assertFailedResponse(t, err)
})
t.Run("PeeringDelete", func(t *testing.T) {
_, err := client.PeeringDelete(ctx, &pbpeering.PeeringDeleteRequest{})
assertFailedResponse(t, err)
})
t.Run("PeeringList", func(t *testing.T) {
_, err := client.PeeringList(ctx, &pbpeering.PeeringListRequest{})
assertFailedResponse(t, err)
})
t.Run("Establish", func(t *testing.T) {
_, err := client.Establish(ctx, &pbpeering.EstablishRequest{})
assertFailedResponse(t, err)
})
t.Run("GenerateToken", func(t *testing.T) {
_, err := client.GenerateToken(ctx, &pbpeering.GenerateTokenRequest{})
assertFailedResponse(t, err)
})
t.Run("TrustBundleRead", func(t *testing.T) {
_, err := client.TrustBundleRead(ctx, &pbpeering.TrustBundleReadRequest{})
assertFailedResponse(t, err)
})
t.Run("TrustBundleListByService", func(t *testing.T) {
_, err := client.TrustBundleListByService(ctx, &pbpeering.TrustBundleListByServiceRequest{})
assertFailedResponse(t, err)
})
}
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
// TODO(peering): these are endpoint tests and should live in the agent/consul
// package. Instead, these can be written around a mock client (see testing.go)

View File

@ -45,6 +45,8 @@ There are four specific cases covered with increasing complexity:
- [ ] Add that to `DefaultSource` in `agent/config/defaults.go`.
- [ ] Add a test case to the table test `TestLoad_IntegrationWithFlags` in
`agent/config/runtime_test.go`.
- [ ] If the config needs to be defaulted for the test server used in unit tests,
also add it to `DefaultConfig()` in `agent/consul/defaults.go`.
- [ ] **If** your config should take effect on a reload/HUP.
- [ ] Add necessary code to to trigger a safe (locked or atomic) update to
any state the feature needs changing. This needs to be added to one or

View File

@ -551,6 +551,15 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `max_query_time` Equivalent to the [`-max-query-time` command-line flag](/docs/agent/config/cli-flags#_max_query_time).
- `peering` This object allows setting options for cluster peering.
The following sub-keys are available:
- `enabled` ((#peering_enabled)) (Defaults to `true`) Controls whether cluster peering is enabled.
Has no effect on Consul clients, only on Consul servers. When disabled, all peering APIs will return
an error, any peerings stored in Consul already will be ignored (but they will not be deleted),
all peering connections from other clusters will be rejected. This was added in Consul 1.13.0.
- `partition` <EnterpriseAlert inline /> - This flag is used to set
the name of the admin partition the agent belongs to. An agent can only join
and communicate with other agents within its admin partition. Review the