diff --git a/.changelog/_1391.txt b/.changelog/_1391.txt new file mode 100644 index 0000000000..f1dbe4909f --- /dev/null +++ b/.changelog/_1391.txt @@ -0,0 +1,3 @@ +```release-note:feature +partitions: **(Enterprise only)** Ensure partitions and serf-based WAN federation are mutually exclusive. +``` diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 85c1b26f4e..cad141c112 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -88,7 +88,7 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error { func (s *Server) setupSerfLAN(config *Config) error { var err error // Initialize the LAN Serf for the default network segment. - s.serfLAN, err = s.setupSerf(setupSerfOptions{ + s.serfLAN, _, err = s.setupSerf(setupSerfOptions{ Config: config.SerfLANConfig, EventCh: s.eventChLAN, SnapshotPath: serfLANSnapshot, diff --git a/agent/consul/merge.go b/agent/consul/merge.go index 04a41f0f53..3063058811 100644 --- a/agent/consul/merge.go +++ b/agent/consul/merge.go @@ -2,6 +2,7 @@ package consul import ( "fmt" + "sync" "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" @@ -86,14 +87,41 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { // ring. We check that the peers are server nodes and abort the merge // otherwise. type wanMergeDelegate struct { + localDatacenter string + + federationDisabledLock sync.Mutex + federationDisabled bool +} + +// SetWANFederationDisabled selectively disables the wan pool from accepting +// non-local members. If the toggle changed the current value it returns true. +func (md *wanMergeDelegate) SetWANFederationDisabled(disabled bool) bool { + md.federationDisabledLock.Lock() + prior := md.federationDisabled + md.federationDisabled = disabled + md.federationDisabledLock.Unlock() + + return prior != disabled } func (md *wanMergeDelegate) NotifyMerge(members []*serf.Member) error { + // Deliberately hold this lock during the entire merge so calls to + // SetWANFederationDisabled returning immediately imply that the flag takes + // effect for all future merges. + md.federationDisabledLock.Lock() + defer md.federationDisabledLock.Unlock() + for _, m := range members { - ok, _ := metadata.IsConsulServer(*m) + ok, srv := metadata.IsConsulServer(*m) if !ok { return fmt.Errorf("Member '%s' is not a server", m.Name) } + + if md.federationDisabled { + if srv.Datacenter != md.localDatacenter { + return fmt.Errorf("Member '%s' part of wrong datacenter '%s'; WAN federation is disabled", m.Name, srv.Datacenter) + } + } } return nil } diff --git a/agent/consul/merge_test.go b/agent/consul/merge_test.go index 7219edcab1..1a8c57bd83 100644 --- a/agent/consul/merge_test.go +++ b/agent/consul/merge_test.go @@ -138,10 +138,16 @@ func TestMerge_WAN(t *testing.T) { type testcase struct { members []*serf.Member expect string + setupFn func(t *testing.T, delegate *wanMergeDelegate) } run := func(t *testing.T, tc testcase) { - delegate := &wanMergeDelegate{} + delegate := &wanMergeDelegate{ + localDatacenter: "dc1", + } + if tc.setupFn != nil { + tc.setupFn(t, delegate) + } err := delegate.NotifyMerge(tc.members) if tc.expect == "" { require.NoError(t, err) @@ -177,7 +183,33 @@ func TestMerge_WAN(t *testing.T) { build: "0.7.5", }), }, - expect: "", + }, + "federation disabled and local join allowed": { + setupFn: func(t *testing.T, delegate *wanMergeDelegate) { + delegate.SetWANFederationDisabled(true) + }, + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc1", + name: "node1", + server: true, + build: "0.7.5", + }), + }, + }, + "federation disabled and remote join blocked": { + setupFn: func(t *testing.T, delegate *wanMergeDelegate) { + delegate.SetWANFederationDisabled(true) + }, + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc2", + name: "node1", + server: true, + build: "0.7.5", + }), + }, + expect: `WAN federation is disabled`, }, } diff --git a/agent/consul/server.go b/agent/consul/server.go index 554d5cbe58..cf2b0125c0 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -189,6 +189,12 @@ type Server struct { // serf cluster that spans datacenters eventChWAN chan serf.Event + // wanMembershipNotifyCh is used to receive notifications that the the + // serfWAN wan pool may have changed. + // + // If this is nil, notification is skipped. + wanMembershipNotifyCh chan struct{} + // fsm is the state machine used with Raft to provide // strong consistency. fsm *fsm.FSM @@ -266,6 +272,7 @@ type Server struct { // serfWAN is the Serf cluster maintained between DC's // which SHOULD only consist of Consul servers serfWAN *serf.Serf + serfWANConfig *serf.Config memberlistTransportWAN wanfed.IngestionAwareTransport gatewayLocator *GatewayLocator @@ -493,7 +500,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { // Initialize the WAN Serf if enabled if config.SerfWANConfig != nil { - s.serfWAN, err = s.setupSerf(setupSerfOptions{ + s.serfWAN, s.serfWANConfig, err = s.setupSerf(setupSerfOptions{ Config: config.SerfWANConfig, EventCh: s.eventChWAN, SnapshotPath: serfWANSnapshot, @@ -548,7 +555,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { s.Shutdown() return nil, fmt.Errorf("Failed to add WAN serf route: %v", err) } - go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN) + go router.HandleSerfEvents(s.logger, s.router, types.AreaWAN, s.serfWAN.ShutdownCh(), s.eventChWAN, s.wanMembershipNotifyCh) // Fire up the LAN <-> WAN join flooder. addrFn := func(s *metadata.Server) (string, error) { @@ -1124,6 +1131,11 @@ func (s *Server) JoinWAN(addrs []string) (int, error) { if s.serfWAN == nil { return 0, ErrWANFederationDisabled } + + if err := s.enterpriseValidateJoinWAN(); err != nil { + return 0, err + } + return s.serfWAN.Join(addrs, true) } diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index f6217b9991..7d4830d1bb 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -19,6 +19,10 @@ import ( func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {} +func (s *Server) enterpriseValidateJoinWAN() error { + return nil // no-op +} + // JoinLAN is used to have Consul join the inner-DC pool The target address // should be another node inside the DC listening on the Serf LAN address func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) { diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index f5864f654e..44c3f857a4 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -48,12 +48,18 @@ type setupSerfOptions struct { } // setupSerf is used to setup and initialize a Serf -func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, error) { +func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, *serf.Config, error) { conf, err := s.setupSerfConfig(opts) if err != nil { - return nil, err + return nil, nil, err } - return serf.Create(conf) + + cluster, err := serf.Create(conf) + if err != nil { + return nil, nil, err + } + + return cluster, conf, nil } func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) { @@ -152,7 +158,9 @@ func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) { conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] conf.RejoinAfterLeave = s.config.RejoinAfterLeave if opts.WAN { - conf.Merge = &wanMergeDelegate{} + conf.Merge = &wanMergeDelegate{ + localDatacenter: s.config.Datacenter, + } } else { conf.Merge = &lanMergeDelegate{ dc: s.config.Datacenter, diff --git a/agent/router/serf_adapter.go b/agent/router/serf_adapter.go index b051b2f965..7208fe1236 100644 --- a/agent/router/serf_adapter.go +++ b/agent/router/serf_adapter.go @@ -1,10 +1,11 @@ package router import ( - "github.com/hashicorp/consul/agent/metadata" - "github.com/hashicorp/consul/types" "github.com/hashicorp/go-hclog" "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/types" ) // routerFn selects one of the router operations to map to incoming Serf events. @@ -50,7 +51,18 @@ func handleMemberEvent(logger hclog.Logger, fn routerFn, areaID types.AreaID, e // HandleSerfEvents is a long-running goroutine that pushes incoming events from // a Serf manager's channel into the given router. This will return when the // shutdown channel is closed. -func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event) { +// +// If membershipNotifyCh is non-nil, it must be a buffered channel of size one +// with one consumer. That consumer will be notified when +// Join/Leave/Failed/Update occur on this serf pool. +func HandleSerfEvents( + logger hclog.Logger, + router *Router, + areaID types.AreaID, + shutdownCh <-chan struct{}, + eventCh <-chan serf.Event, + membershipNotifyCh chan<- struct{}, +) { for { select { case <-shutdownCh: @@ -60,15 +72,19 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID, switch e.EventType() { case serf.EventMemberJoin: handleMemberEvent(logger, router.AddServer, areaID, e) + notifyMembershipPossibleChange(membershipNotifyCh) case serf.EventMemberLeave, serf.EventMemberReap: handleMemberEvent(logger, router.RemoveServer, areaID, e) + notifyMembershipPossibleChange(membershipNotifyCh) case serf.EventMemberFailed: handleMemberEvent(logger, router.FailServer, areaID, e) + notifyMembershipPossibleChange(membershipNotifyCh) case serf.EventMemberUpdate: handleMemberEvent(logger, router.AddServer, areaID, e) + notifyMembershipPossibleChange(membershipNotifyCh) // All of these event types are ignored. case serf.EventUser: @@ -80,3 +96,15 @@ func HandleSerfEvents(logger hclog.Logger, router *Router, areaID types.AreaID, } } } + +func notifyMembershipPossibleChange(membershipNotifyCh chan<- struct{}) { + if membershipNotifyCh == nil { + return + } + + // Notify if not already notified. + select { + case membershipNotifyCh <- struct{}{}: + default: + } +}