light refactors to support making partitions and serf-based wan federation are mutually exclusive (#11755)

This commit is contained in:
R.B. Boyer 2021-12-06 13:18:02 -06:00 committed by GitHub
parent 05fabbe898
commit b1605639fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 128 additions and 13 deletions

3
.changelog/_1391.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
partitions: **(Enterprise only)** Ensure partitions and serf-based WAN federation are mutually exclusive.
```

View File

@ -88,7 +88,7 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error {
func (s *Server) setupSerfLAN(config *Config) error {
var err error
// Initialize the LAN Serf for the default network segment.
s.serfLAN, err = s.setupSerf(setupSerfOptions{
s.serfLAN, _, err = s.setupSerf(setupSerfOptions{
Config: config.SerfLANConfig,
EventCh: s.eventChLAN,
SnapshotPath: serfLANSnapshot,

View File

@ -2,6 +2,7 @@ package consul
import (
"fmt"
"sync"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf"
@ -86,14 +87,41 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
// ring. We check that the peers are server nodes and abort the merge
// otherwise.
type wanMergeDelegate struct {
localDatacenter string
federationDisabledLock sync.Mutex
federationDisabled bool
}
// SetWANFederationDisabled selectively disables the wan pool from accepting
// non-local members. If the toggle changed the current value it returns true.
func (md *wanMergeDelegate) SetWANFederationDisabled(disabled bool) bool {
md.federationDisabledLock.Lock()
prior := md.federationDisabled
md.federationDisabled = disabled
md.federationDisabledLock.Unlock()
return prior != disabled
}
func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error {
// Deliberately hold this lock during the entire merge so calls to
// SetWANFederationDisabled returning immediately imply that the flag takes
// effect for all future merges.
md.federationDisabledLock.Lock()
defer md.federationDisabledLock.Unlock()
for _, m := range members {
ok, _ := metadata.IsConsulServer(*m)
ok, srv := metadata.IsConsulServer(*m)
if !ok {
return fmt.Errorf("Member '%s' is not a server", m.Name)
}
if md.federationDisabled {
if srv.Datacenter != md.localDatacenter {
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'; WAN federation is disabled", m.Name, srv.Datacenter)
}
}
}
return nil
}

View File

@ -138,10 +138,16 @@ func TestMerge_WAN(t *testing.T) {
type testcase struct {
members []*serf.Member
expect string
setupFn func(t *testing.T, delegate *wanMergeDelegate)
}
run := func(t *testing.T, tc testcase) {
delegate := &wanMergeDelegate{}
delegate := &wanMergeDelegate{
localDatacenter: "dc1",
}
if tc.setupFn != nil {
tc.setupFn(t, delegate)
}
err := delegate.NotifyMerge(tc.members)
if tc.expect == "" {
require.NoError(t, err)
@ -177,7 +183,33 @@ func TestMerge_WAN(t *testing.T) {
build: "0.7.5",
}),
},
expect: "",
},
"federation disabled and local join allowed": {
setupFn: func(t *testing.T, delegate *wanMergeDelegate) {
delegate.SetWANFederationDisabled(true)
},
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc1",
name: "node1",
server: true,
build: "0.7.5",
}),
},
},
"federation disabled and remote join blocked": {
setupFn: func(t *testing.T, delegate *wanMergeDelegate) {
delegate.SetWANFederationDisabled(true)
},
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc2",
name: "node1",
server: true,
build: "0.7.5",
}),
},
expect: `WAN federation is disabled`,
},
}

View File

@ -189,6 +189,12 @@ type Server struct {
// serf cluster that spans datacenters
eventChWAN chan serf.Event
// wanMembershipNotifyCh is used to receive notifications that the the
// serfWAN wan pool may have changed.
//
// If this is nil, notification is skipped.
wanMembershipNotifyCh chan struct{}
// fsm is the state machine used with Raft to provide
// strong consistency.
fsm *fsm.FSM
@ -266,6 +272,7 @@ type Server struct {
// serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
serfWANConfig *serf.Config
memberlistTransportWAN wanfed.IngestionAwareTransport
gatewayLocator *GatewayLocator
@ -493,7 +500,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
// Initialize the WAN Serf if enabled
if config.SerfWANConfig != nil {
s.serfWAN, err = s.setupSerf(setupSerfOptions{
s.serfWAN, s.serfWANConfig, err = s.setupSerf(setupSerfOptions{
Config: config.SerfWANConfig,
EventCh: s.eventChWAN,
SnapshotPath: serfWANSnapshot,
@ -548,7 +555,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
s.Shutdown()
return nil, fmt.Errorf("Failed to add WAN serf route: %v", err)
}
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN)
go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN, s.wanMembershipNotifyCh)
// Fire up the LAN <-> WAN join flooder.
addrFn := func(s *metadata.Server) (string, error) {
@ -1124,6 +1131,11 @@ func (s *Server) JoinWAN(addrs []string) (int, error) {
if s.serfWAN == nil {
return 0, ErrWANFederationDisabled
}
if err := s.enterpriseValidateJoinWAN(); err != nil {
return 0, err
}
return s.serfWAN.Join(addrs, true)
}

View File

@ -19,6 +19,10 @@ import (
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}
func (s *Server) enterpriseValidateJoinWAN() error {
return nil // no-op
}
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) {

View File

@ -48,12 +48,18 @@ type setupSerfOptions struct {
}
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, error) {
func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, *serf.Config, error) {
conf, err := s.setupSerfConfig(opts)
if err != nil {
return nil, err
return nil, nil, err
}
return serf.Create(conf)
cluster, err := serf.Create(conf)
if err != nil {
return nil, nil, err
}
return cluster, conf, nil
}
func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) {
@ -152,7 +158,9 @@ func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) {
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if opts.WAN {
conf.Merge = &wanMergeDelegate{}
conf.Merge = &wanMergeDelegate{
localDatacenter: s.config.Datacenter,
}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,

View File

@ -1,10 +1,11 @@
package router
import (
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/types"
)
// routerFn selects one of the router operations to map to incoming Serf events.
@ -50,7 +51,18 @@ func handleMemberEvent(logger hclog.Logger, fn routerFn, areaID types.AreaID, e
// HandleSerfEvents is a long-running goroutine that pushes incoming events from
// a Serf manager's channel into the given router. This will return when the
// shutdown channel is closed.
func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event) {
//
// If membershipNotifyCh is non-nil, it must be a buffered channel of size one
// with one consumer. That consumer will be notified when
// Join/Leave/Failed/Update occur on this serf pool.
func HandleSerfEvents(
logger hclog.Logger,
router *Router,
areaID types.AreaID,
shutdownCh <-chan struct{},
eventCh <-chan serf.Event,
membershipNotifyCh chan<- struct{},
) {
for {
select {
case <-shutdownCh:
@ -60,15 +72,19 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID,
switch e.EventType() {
case serf.EventMemberJoin:
handleMemberEvent(logger, router.AddServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberLeave, serf.EventMemberReap:
handleMemberEvent(logger, router.RemoveServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberFailed:
handleMemberEvent(logger, router.FailServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
case serf.EventMemberUpdate:
handleMemberEvent(logger, router.AddServer, areaID, e)
notifyMembershipPossibleChange(membershipNotifyCh)
// All of these event types are ignored.
case serf.EventUser:
@ -80,3 +96,15 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID,
}
}
}
func notifyMembershipPossibleChange(membershipNotifyCh chan<- struct{}) {
if membershipNotifyCh == nil {
return
}
// Notify if not already notified.
select {
case membershipNotifyCh <- struct{}{}:
default:
}
}