Fix CA Replication when ACLs are enabled (#6201)

Secondary CA initialization steps are:

• Wait until the primary will be capable of signing intermediate certs. We use serf metadata to check the versions of servers in the primary which avoids needing a token like the previous implementation that used RPCs. We require at least one alive server in the primary and the all alive servers meet the version requirement.
• Initialize the secondary CA by getting the primary to sign an intermediate

When a primary dc is configured, if no existing CA is initialized and for whatever reason we cannot initialize a secondary CA the secondary DC will remain without a CA. As soon as it can it will initialize the secondary CA by pulling the primaries roots and getting the primary to sign an intermediate.

This also fixes a segfault that can happen during leadership revocation. There was a spot in the secondaryCARootsWatch that was getting the CA Provider and executing methods on it without nil checking. Under normal circumstances it wont be nil but during leadership revocation it gets nil'ed out. Therefore there is a period of time between closing the stop chan and when the go routine is actually stopped where it could read a nil provider and cause a segfault.
This commit is contained in:
Matt Keeler 2019-07-26 15:57:57 -04:00 committed by GitHub
parent 59454c7edc
commit 35e67b1d1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 629 additions and 611 deletions

View File

@ -6,9 +6,16 @@ import (
"net"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf"
)
var (
// minMultiDCConnectVersion is the minimum version in order to support multi-DC Connect
// features.
minMultiDCConnectVersion = version.Must(version.NewVersion("1.6.0"))
)
type EnterpriseServer struct{}
func (s *Server) initEnterprise() error {

View File

@ -5,15 +5,12 @@ import (
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
ca "github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
@ -329,8 +326,6 @@ func (s *Server) establishLeadership() error {
s.startConnectLeader()
s.startCARootPruning()
s.setConsistentReadReady()
return nil
}
@ -349,8 +344,6 @@ func (s *Server) revokeLeadership() {
s.stopConnectLeader()
s.stopCARootPruning()
s.setCAProvider(nil, nil)
s.stopACLTokenReaping()
@ -968,272 +961,6 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
return nil
}
// initializeCAConfig is used to initialize the CA config if necessary
// when setting up the CA during establishLeadership
func (s *Server) initializeCAConfig() (*structs.CAConfiguration, error) {
state := s.fsm.State()
_, config, err := state.CAConfig()
if err != nil {
return nil, err
}
if config != nil {
return config, nil
}
config = s.config.CAConfig
if config.ClusterID == "" {
id, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
config.ClusterID = id
}
req := structs.CARequest{
Op: structs.CAOpSetConfig,
Config: config,
}
if _, err = s.raftApply(structs.ConnectCARequestType, req); err != nil {
return nil, err
}
return config, nil
}
// initializeRootCA runs the initialization logic for a root CA.
func (s *Server) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error {
if err := provider.Configure(conf.ClusterID, true, conf.Config); err != nil {
return fmt.Errorf("error configuring provider: %v", err)
}
if err := provider.GenerateRoot(); err != nil {
return fmt.Errorf("error generating CA root certificate: %v", err)
}
// Get the active root cert from the CA
rootPEM, err := provider.ActiveRoot()
if err != nil {
return fmt.Errorf("error getting root cert: %v", err)
}
rootCA, err := parseCARoot(rootPEM, conf.Provider, conf.ClusterID)
if err != nil {
return err
}
// Check if the CA root is already initialized and exit if it is,
// adding on any existing intermediate certs since they aren't directly
// tied to the provider.
// Every change to the CA after this initial bootstrapping should
// be done through the rotation process.
state := s.fsm.State()
_, activeRoot, err := state.CARootActive(nil)
if err != nil {
return err
}
if activeRoot != nil {
// This state shouldn't be possible to get into because we update the root and
// CA config in the same FSM operation.
if activeRoot.ID != rootCA.ID {
return fmt.Errorf("stored CA root %q is not the active root (%s)", rootCA.ID, activeRoot.ID)
}
rootCA.IntermediateCerts = activeRoot.IntermediateCerts
s.setCAProvider(provider, rootCA)
return nil
}
// Get the highest index
idx, _, err := state.CARoots(nil)
if err != nil {
return err
}
// Store the root cert in raft
resp, err := s.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSetRoots,
Index: idx,
Roots: []*structs.CARoot{rootCA},
})
if err != nil {
s.logger.Printf("[ERR] connect: Apply failed %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
s.setCAProvider(provider, rootCA)
s.logger.Printf("[INFO] connect: initialized primary datacenter CA with provider %q", conf.Provider)
return nil
}
// parseCARoot returns a filled-in structs.CARoot from a raw PEM value.
func parseCARoot(pemValue, provider, clusterID string) (*structs.CARoot, error) {
id, err := connect.CalculateCertFingerprint(pemValue)
if err != nil {
return nil, fmt.Errorf("error parsing root fingerprint: %v", err)
}
rootCert, err := connect.ParseCert(pemValue)
if err != nil {
return nil, fmt.Errorf("error parsing root cert: %v", err)
}
return &structs.CARoot{
ID: id,
Name: fmt.Sprintf("%s CA Root Cert", strings.Title(provider)),
SerialNumber: rootCert.SerialNumber.Uint64(),
SigningKeyID: connect.HexString(rootCert.SubjectKeyId),
ExternalTrustDomain: clusterID,
NotBefore: rootCert.NotBefore,
NotAfter: rootCert.NotAfter,
RootCert: pemValue,
Active: true,
}, nil
}
// createProvider returns a connect CA provider from the given config.
func (s *Server) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) {
switch conf.Provider {
case structs.ConsulCAProvider:
return &ca.ConsulProvider{Delegate: &consulCADelegate{s}}, nil
case structs.VaultCAProvider:
return &ca.VaultProvider{}, nil
default:
return nil, fmt.Errorf("unknown CA provider %q", conf.Provider)
}
}
func (s *Server) getCAProvider() (ca.Provider, *structs.CARoot) {
retries := 0
var result ca.Provider
var resultRoot *structs.CARoot
for result == nil {
s.caProviderLock.RLock()
result = s.caProvider
resultRoot = s.caProviderRoot
s.caProviderLock.RUnlock()
// In cases where an agent is started with managed proxies, we may ask
// for the provider before establishLeadership completes. If we're the
// leader, then wait and get the provider again
if result == nil && s.IsLeader() && retries < 10 {
retries++
time.Sleep(50 * time.Millisecond)
continue
}
break
}
return result, resultRoot
}
func (s *Server) setCAProvider(newProvider ca.Provider, root *structs.CARoot) {
s.caProviderLock.Lock()
defer s.caProviderLock.Unlock()
s.caProvider = newProvider
s.caProviderRoot = root
}
// startCARootPruning starts a goroutine that looks for stale CARoots
// and removes them from the state store.
func (s *Server) startCARootPruning() {
s.caPruningLock.Lock()
defer s.caPruningLock.Unlock()
if s.caPruningEnabled {
return
}
s.caPruningCh = make(chan struct{})
go func() {
ticker := time.NewTicker(caRootPruneInterval)
defer ticker.Stop()
for {
select {
case <-s.caPruningCh:
return
case <-ticker.C:
if err := s.pruneCARoots(); err != nil {
s.logger.Printf("[ERR] connect: error pruning CA roots: %v", err)
}
}
}
}()
s.caPruningEnabled = true
}
// pruneCARoots looks for any CARoots that have been rotated out and expired.
func (s *Server) pruneCARoots() error {
if !s.config.ConnectEnabled {
return nil
}
state := s.fsm.State()
idx, roots, err := state.CARoots(nil)
if err != nil {
return err
}
_, caConf, err := state.CAConfig()
if err != nil {
return err
}
common, err := caConf.GetCommonConfig()
if err != nil {
return err
}
var newRoots structs.CARoots
for _, r := range roots {
if !r.Active && !r.RotatedOutAt.IsZero() && time.Since(r.RotatedOutAt) > common.LeafCertTTL*2 {
s.logger.Printf("[INFO] connect: pruning old unused root CA (ID: %s)", r.ID)
continue
}
newRoot := *r
newRoots = append(newRoots, &newRoot)
}
// Return early if there's nothing to remove.
if len(newRoots) == len(roots) {
return nil
}
// Commit the new root state.
var args structs.CARequest
args.Op = structs.CAOpSetRoots
args.Index = idx
args.Roots = newRoots
resp, err := s.raftApply(structs.ConnectCARequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
// stopCARootPruning stops the CARoot pruning process.
func (s *Server) stopCARootPruning() {
s.caPruningLock.Lock()
defer s.caPruningLock.Unlock()
if !s.caPruningEnabled {
return
}
close(s.caPruningCh)
s.caPruningEnabled = false
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
// We generate a "reap" event to cause the node to be cleaned up.

View File

@ -3,7 +3,6 @@ package consul
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"time"
@ -12,9 +11,8 @@ import (
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-version"
uuid "github.com/hashicorp/go-uuid"
)
const (
@ -36,17 +34,110 @@ var (
// queries when backing off.
maxRetryBackoff = 256
// minMultiDCConnectVersion is the minimum version in order to support multi-DC Connect
// features.
minMultiDCConnectVersion = version.Must(version.NewVersion("1.4.0"))
// maxRootsQueryTime is the maximum time the primary roots watch query can block before
// returning.
maxRootsQueryTime = maxQueryTime
errEmptyVersion = errors.New("version string is empty")
)
// initializeCAConfig is used to initialize the CA config if necessary
// when setting up the CA during establishLeadership
func (s *Server) initializeCAConfig() (*structs.CAConfiguration, error) {
state := s.fsm.State()
_, config, err := state.CAConfig()
if err != nil {
return nil, err
}
if config != nil {
return config, nil
}
config = s.config.CAConfig
if config.ClusterID == "" {
id, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
config.ClusterID = id
}
req := structs.CARequest{
Op: structs.CAOpSetConfig,
Config: config,
}
if _, err = s.raftApply(structs.ConnectCARequestType, req); err != nil {
return nil, err
}
return config, nil
}
// parseCARoot returns a filled-in structs.CARoot from a raw PEM value.
func parseCARoot(pemValue, provider, clusterID string) (*structs.CARoot, error) {
id, err := connect.CalculateCertFingerprint(pemValue)
if err != nil {
return nil, fmt.Errorf("error parsing root fingerprint: %v", err)
}
rootCert, err := connect.ParseCert(pemValue)
if err != nil {
return nil, fmt.Errorf("error parsing root cert: %v", err)
}
return &structs.CARoot{
ID: id,
Name: fmt.Sprintf("%s CA Root Cert", strings.Title(provider)),
SerialNumber: rootCert.SerialNumber.Uint64(),
SigningKeyID: connect.HexString(rootCert.SubjectKeyId),
ExternalTrustDomain: clusterID,
NotBefore: rootCert.NotBefore,
NotAfter: rootCert.NotAfter,
RootCert: pemValue,
Active: true,
}, nil
}
// createProvider returns a connect CA provider from the given config.
func (s *Server) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) {
switch conf.Provider {
case structs.ConsulCAProvider:
return &ca.ConsulProvider{Delegate: &consulCADelegate{s}}, nil
case structs.VaultCAProvider:
return &ca.VaultProvider{}, nil
default:
return nil, fmt.Errorf("unknown CA provider %q", conf.Provider)
}
}
func (s *Server) getCAProvider() (ca.Provider, *structs.CARoot) {
retries := 0
var result ca.Provider
var resultRoot *structs.CARoot
for result == nil {
s.caProviderLock.RLock()
result = s.caProvider
resultRoot = s.caProviderRoot
s.caProviderLock.RUnlock()
// In cases where an agent is started with managed proxies, we may ask
// for the provider before establishLeadership completes. If we're the
// leader, then wait and get the provider again
if result == nil && s.IsLeader() && retries < 10 {
retries++
time.Sleep(50 * time.Millisecond)
continue
}
break
}
return result, resultRoot
}
func (s *Server) setCAProvider(newProvider ca.Provider, root *structs.CARoot) {
s.caProviderLock.Lock()
defer s.caProviderLock.Unlock()
s.caProvider = newProvider
s.caProviderRoot = root
}
// initializeCA sets up the CA provider when gaining leadership, either bootstrapping
// the CA if this is the primary DC or making a remote RPC for intermediate signing
// if this is a secondary DC.
@ -67,26 +158,19 @@ func (s *Server) initializeCA() error {
}
s.setCAProvider(provider, nil)
// Check whether the primary DC has been upgraded to support multi-DC Connect.
// If it hasn't, we skip the secondary initialization routine and continue acting
// as a primary DC. This is periodically re-checked in the goroutine watching the
// primary's CA roots so that we can transition to a secondary DC when it has
// been upgraded.
var primaryHasVersion bool
// If this isn't the primary DC, run the secondary DC routine if the primary has already been upgraded to at least 1.6.0
if s.config.PrimaryDatacenter != s.config.Datacenter {
primaryHasVersion, err = s.datacentersMeetMinVersion(minMultiDCConnectVersion)
if err == errEmptyVersion {
s.logger.Printf("[WARN] connect: primary datacenter %q is reachable but not yet initialized", s.config.PrimaryDatacenter)
versionOk, foundPrimary := ServersInDCMeetMinimumVersion(s.WANMembers(), s.config.PrimaryDatacenter, minMultiDCConnectVersion)
if !foundPrimary {
s.logger.Printf("[WARN] connect: primary datacenter is configured but unreachable - deferring initialization of the secondary datacenter CA")
// return nil because we will initialize the secondary CA later
return nil
} else if err != nil {
s.logger.Printf("[ERR] connect: error initializing CA: could not query primary datacenter: %v", err)
} else if !versionOk {
// return nil because we will initialize the secondary CA later
s.logger.Printf("[WARN] connect: servers in the primary datacenter are not at least at version %s - deferring initialization of the secondary datacenter CA", minMultiDCConnectVersion)
return nil
}
}
// If this isn't the primary DC, run the secondary DC routine if the primary has already
// been upgraded to at least 1.4.0.
if s.config.PrimaryDatacenter != s.config.Datacenter && primaryHasVersion {
// Get the root CA to see if we need to refresh our intermediate.
args := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter,
@ -111,6 +195,76 @@ func (s *Server) initializeCA() error {
return s.initializeRootCA(provider, conf)
}
// initializeRootCA runs the initialization logic for a root CA.
func (s *Server) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error {
if err := provider.Configure(conf.ClusterID, true, conf.Config); err != nil {
return fmt.Errorf("error configuring provider: %v", err)
}
if err := provider.GenerateRoot(); err != nil {
return fmt.Errorf("error generating CA root certificate: %v", err)
}
// Get the active root cert from the CA
rootPEM, err := provider.ActiveRoot()
if err != nil {
return fmt.Errorf("error getting root cert: %v", err)
}
rootCA, err := parseCARoot(rootPEM, conf.Provider, conf.ClusterID)
if err != nil {
return err
}
// Check if the CA root is already initialized and exit if it is,
// adding on any existing intermediate certs since they aren't directly
// tied to the provider.
// Every change to the CA after this initial bootstrapping should
// be done through the rotation process.
state := s.fsm.State()
_, activeRoot, err := state.CARootActive(nil)
if err != nil {
return err
}
if activeRoot != nil {
// This state shouldn't be possible to get into because we update the root and
// CA config in the same FSM operation.
if activeRoot.ID != rootCA.ID {
return fmt.Errorf("stored CA root %q is not the active root (%s)", rootCA.ID, activeRoot.ID)
}
rootCA.IntermediateCerts = activeRoot.IntermediateCerts
s.setCAProvider(provider, rootCA)
return nil
}
// Get the highest index
idx, _, err := state.CARoots(nil)
if err != nil {
return err
}
// Store the root cert in raft
resp, err := s.raftApply(structs.ConnectCARequestType, &structs.CARequest{
Op: structs.CAOpSetRoots,
Index: idx,
Roots: []*structs.CARoot{rootCA},
})
if err != nil {
s.logger.Printf("[ERR] connect: Apply failed %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
s.setCAProvider(provider, rootCA)
s.logger.Printf("[INFO] connect: initialized primary datacenter CA with provider %q", conf.Provider)
return nil
}
// initializeSecondaryCA runs the routine for generating an intermediate CA CSR and getting
// it signed by the primary DC if the root CA of the primary DC has changed since the last
// intermediate.
@ -252,8 +406,11 @@ func (s *Server) startConnectLeader() {
if s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter {
go s.secondaryCARootWatch(s.connectCh)
go s.replicateIntentions(s.connectCh)
}
go s.runCARootPruning(s.connectCh)
s.connectEnabled = true
}
@ -274,6 +431,75 @@ func (s *Server) stopConnectLeader() {
s.connectEnabled = false
}
func (s *Server) runCARootPruning(stopCh <-chan struct{}) {
ticker := time.NewTicker(caRootPruneInterval)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
if err := s.pruneCARoots(); err != nil {
s.logger.Printf("[ERR] connect: error pruning CA roots: %v", err)
}
}
}
}
// pruneCARoots looks for any CARoots that have been rotated out and expired.
func (s *Server) pruneCARoots() error {
if !s.config.ConnectEnabled {
return nil
}
state := s.fsm.State()
idx, roots, err := state.CARoots(nil)
if err != nil {
return err
}
_, caConf, err := state.CAConfig()
if err != nil {
return err
}
common, err := caConf.GetCommonConfig()
if err != nil {
return err
}
var newRoots structs.CARoots
for _, r := range roots {
if !r.Active && !r.RotatedOutAt.IsZero() && time.Now().Sub(r.RotatedOutAt) > common.LeafCertTTL*2 {
s.logger.Printf("[INFO] connect: pruning old unused root CA (ID: %s)", r.ID)
continue
}
newRoot := *r
newRoots = append(newRoots, &newRoot)
}
// Return early if there's nothing to remove.
if len(newRoots) == len(roots) {
return nil
}
// Commit the new root state.
var args structs.CARequest
args.Op = structs.CAOpSetRoots
args.Index = idx
args.Roots = newRoots
resp, err := s.raftApply(structs.ConnectCARequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
// secondaryCARootWatch maintains a blocking query to the primary datacenter's
// ConnectCA.Roots endpoint to monitor when it needs to request a new signed
// intermediate certificate.
@ -290,21 +516,25 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
retryLoopBackoff(stopCh, func() error {
var roots structs.IndexedCARoots
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
return err
return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err)
}
// Check to see if the primary has been upgraded in case we're waiting to switch to
// secondary mode.
provider, _ := s.getCAProvider()
if provider == nil {
// this happens when leadership is being revoked and this go routine will be stopped
return nil
}
if !s.configuredSecondaryCA() {
primaryHasVersion, err := s.datacentersMeetMinVersion(minMultiDCConnectVersion)
if err != nil {
return err
versionOk, primaryFound := ServersInDCMeetMinimumVersion(s.WANMembers(), s.config.PrimaryDatacenter, minMultiDCConnectVersion)
if !primaryFound {
return fmt.Errorf("Primary datacenter is unreachable - deferring secondary CA initialization")
}
if primaryHasVersion {
if versionOk {
if err := s.initializeSecondaryProvider(provider, roots); err != nil {
return err
return fmt.Errorf("Failed to initialize secondary CA provider: %v", err)
}
}
}
@ -313,17 +543,14 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
// intermediate.
if s.configuredSecondaryCA() {
if err := s.initializeSecondaryCA(provider, roots); err != nil {
return err
return fmt.Errorf("Failed to initialize the secondary CA: %v", err)
}
}
args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, roots.QueryMeta.Index)
return nil
}, func(err error) {
// Don't log the error if it's a result of the primary still starting up.
if err != errEmptyVersion {
s.logger.Printf("[ERR] connect: error watching primary datacenter roots: %v", err)
}
s.logger.Printf("[ERR] connect: %v", err)
})
}
@ -491,53 +718,6 @@ func nextIndexVal(prevIdx, idx uint64) uint64 {
return idx
}
// datacentersMeetMinVersion returns whether this datacenter and the primary
// are ready and have upgraded to at least the given version.
func (s *Server) datacentersMeetMinVersion(minVersion *version.Version) (bool, error) {
localAutopilotHealth := s.autopilot.GetClusterHealth()
localServersMeetVersion, err := autopilotServersMeetMinimumVersion(localAutopilotHealth.Servers, minVersion)
if err != nil {
return false, err
}
if !localServersMeetVersion {
return false, err
}
args := structs.DCSpecificRequest{
Datacenter: s.config.PrimaryDatacenter,
}
var reply autopilot.OperatorHealthReply
if err := s.forwardDC("Operator.ServerHealth", s.config.PrimaryDatacenter, &args, &reply); err != nil {
return false, err
}
remoteServersMeetVersion, err := autopilotServersMeetMinimumVersion(reply.Servers, minVersion)
if err != nil {
return false, err
}
return localServersMeetVersion && remoteServersMeetVersion, nil
}
// autopilotServersMeetMinimumVersion returns whether the given slice of servers
// meets a minimum version.
func autopilotServersMeetMinimumVersion(servers []autopilot.ServerHealth, minVersion *version.Version) (bool, error) {
for _, server := range servers {
if server.Version == "" {
return false, errEmptyVersion
}
version, err := version.NewVersion(server.Version)
if err != nil {
return false, fmt.Errorf("error parsing remote server version: %v", err)
}
if version.LessThan(minVersion) {
return false, nil
}
}
return true, nil
}
// initializeSecondaryProvider configures the given provider for a secondary, non-root datacenter.
func (s *Server) initializeSecondaryProvider(provider ca.Provider, roots structs.IndexedCARoots) error {
if roots.TrustDomain == "" {

View File

@ -3,6 +3,7 @@ package consul
import (
"crypto/x509"
"os"
"reflect"
"strings"
"testing"
"time"
@ -13,6 +14,7 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
uuid "github.com/hashicorp/go-uuid"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -22,26 +24,38 @@ func TestLeader_SecondaryCA_Initialize(t *testing.T) {
require := require.New(t)
masterToken := "8a85f086-dd95-4178-b128-e10902767c5c"
// Initialize primary as the primary DC
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "primary"
c.PrimaryDatacenter = "primary"
c.Build = "1.4.0"
c.Build = "1.6.0"
c.ACLsEnabled = true
c.ACLMasterToken = masterToken
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
s1.tokens.UpdateAgentToken(masterToken, token.TokenSourceConfig)
testrpc.WaitForLeader(t, s1.RPC, "primary")
// secondary as a secondary DC
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "secondary"
c.PrimaryDatacenter = "primary"
c.Build = "1.4.0"
c.Build = "1.6.0"
c.ACLsEnabled = true
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
s2.tokens.UpdateAgentToken(masterToken, token.TokenSourceConfig)
s2.tokens.UpdateReplicationToken(masterToken, token.TokenSourceConfig)
// Create the WAN link
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s2.RPC, "secondary")
@ -102,7 +116,7 @@ func TestLeader_SecondaryCA_IntermediateRefresh(t *testing.T) {
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Build = "1.4.0"
c.Build = "1.6.0"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -113,7 +127,7 @@ func TestLeader_SecondaryCA_IntermediateRefresh(t *testing.T) {
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.Build = "1.4.0"
c.Build = "1.6.0"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
@ -235,7 +249,7 @@ func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.CAConfig.ClusterID = id1
c.Build = "1.4.0"
c.Build = "1.6.0"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -249,7 +263,7 @@ func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2"
c.CAConfig.ClusterID = id2
c.Build = "1.4.0"
c.Build = "1.6.0"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
@ -333,7 +347,6 @@ func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) {
func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) {
t.Parallel()
require := require.New(t)
maxRootsQueryTime = 500 * time.Millisecond
// Initialize dc1 as the primary DC
@ -350,7 +363,7 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) {
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.Build = "1.4.0"
c.Build = "1.6.0"
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
@ -359,72 +372,59 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) {
joinWAN(t, s2, s1)
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Verify the root lists are different in each DC's state store.
var oldSecondaryRootID string
{
// ensure all the CA initialization stuff would have already been done
// this is necessary to ensure that not only has a leader been elected
// but that it has also finished its establishLeadership call
retry.Run(t, func(r *retry.R) {
require.True(r, s1.isReadyForConsistentReads())
require.True(r, s2.isReadyForConsistentReads())
})
// Verify the primary has a root (we faked its version too low but since its the primary it ignores any version checks)
retry.Run(t, func(r *retry.R) {
state1 := s1.fsm.State()
_, roots1, err := state1.CARoots(nil)
require.NoError(err)
require.NoError(r, err)
require.Len(r, roots1, 1)
})
state2 := s2.fsm.State()
_, roots2, err := state2.CARoots(nil)
require.NoError(err)
require.Equal(1, len(roots1))
require.Equal(1, len(roots2))
require.NotEqual(roots1[0].ID, roots2[0].ID)
require.NotEqual(roots1[0].RootCert, roots2[0].RootCert)
oldSecondaryRootID = roots2[0].ID
}
// Verify the secondary does not have a root - defers initialization until the primary has been upgraded.
state2 := s2.fsm.State()
_, roots2, err := state2.CARoots(nil)
require.NoError(t, err)
require.Empty(t, roots2)
// Update the version on the fly so s2 kicks off the secondary DC transition.
tags := s1.config.SerfLANConfig.Tags
tags["build"] = "1.4.0"
s1.serfLAN.SetTags(tags)
tags := s1.config.SerfWANConfig.Tags
tags["build"] = "1.6.0"
s1.serfWAN.SetTags(tags)
// Wait for the secondary transition to happen and then verify the secondary DC
// has both roots present.
secondaryProvider, _ := s2.getCAProvider()
retry.Run(t, func(r *retry.R) {
state := s2.fsm.State()
_, roots, err := state.CARoots(nil)
r.Check(err)
if len(roots) != 2 {
r.Fatalf("should have 2 roots: %v", roots)
}
inter, err := secondaryProvider.ActiveIntermediate()
r.Check(err)
if inter == "" {
r.Fatal("should have valid intermediate")
}
})
{
state1 := s1.fsm.State()
_, roots1, err := state1.CARoots(nil)
require.NoError(err)
require.NoError(r, err)
require.Len(r, roots1, 1)
state2 := s2.fsm.State()
_, roots2, err := state2.CARoots(nil)
require.NoError(err)
require.Equal(1, len(roots1))
require.Equal(2, len(roots2))
var oldSecondaryRoot *structs.CARoot
var newSecondaryRoot *structs.CARoot
if roots2[0].ID == oldSecondaryRootID {
oldSecondaryRoot = roots2[0]
newSecondaryRoot = roots2[1]
} else {
oldSecondaryRoot = roots2[1]
newSecondaryRoot = roots2[0]
}
require.Equal(roots1[0].ID, newSecondaryRoot.ID)
require.Equal(roots1[0].RootCert, newSecondaryRoot.RootCert)
require.NotEqual(newSecondaryRoot.ID, oldSecondaryRoot.ID)
require.NotEqual(newSecondaryRoot.RootCert, oldSecondaryRoot.RootCert)
}
require.NoError(r, err)
require.Len(r, roots2, 1)
// ensure the roots are the same
require.Equal(r, roots1[0].ID, roots2[0].ID)
require.Equal(r, roots1[0].RootCert, roots2[0].RootCert)
inter, err := secondaryProvider.ActiveIntermediate()
require.NoError(r, err)
require.NotEmpty(r, inter, "should have valid intermediate")
})
_, caRoot := s1.getCAProvider()
intermediatePEM, err := secondaryProvider.ActiveIntermediate()
require.NoError(err)
require.NoError(t, err)
// Have dc2 sign a leaf cert and make sure the chain is correct.
spiffeService := &connect.SpiffeIDService{
@ -436,13 +436,13 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) {
raw, _ := connect.TestCSR(t, spiffeService)
leafCsr, err := connect.ParseCSR(raw)
require.NoError(err)
require.NoError(t, err)
leafPEM, err := secondaryProvider.Sign(leafCsr)
require.NoError(err)
require.NoError(t, err)
cert, err := connect.ParseCert(leafPEM)
require.NoError(err)
require.NoError(t, err)
// Check that the leaf signed by the new cert can be verified using the
// returned cert chain (signed intermediate + remote root).
@ -455,7 +455,7 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) {
Intermediates: intermediatePool,
Roots: rootPool,
})
require.NoError(err)
require.NoError(t, err)
}
func TestLeader_ReplicateIntentions(t *testing.T) {
@ -841,3 +841,189 @@ func TestLeader_GenerateCASignRequest(t *testing.T) {
req := s.generateCASignRequest(csr)
assert.Equal(t, "east", req.RequestDatacenter())
}
func TestLeader_CARootPruning(t *testing.T) {
t.Parallel()
caRootPruneInterval = 200 * time.Millisecond
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// Get the current root
rootReq := &structs.DCSpecificRequest{
Datacenter: "dc1",
}
var rootList structs.IndexedCARoots
require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList))
require.Len(rootList.Roots, 1)
oldRoot := rootList.Roots[0]
// Update the provider config to use a new private key, which should
// cause a rotation.
_, newKey, err := connect.GeneratePrivateKey()
require.NoError(err)
newConfig := &structs.CAConfiguration{
Provider: "consul",
Config: map[string]interface{}{
"LeafCertTTL": "500ms",
"PrivateKey": newKey,
"RootCert": "",
"RotationPeriod": "2160h",
"SkipValidate": true,
},
}
{
args := &structs.CARequest{
Datacenter: "dc1",
Config: newConfig,
}
var reply interface{}
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply))
}
// Should have 2 roots now.
_, roots, err := s1.fsm.State().CARoots(nil)
require.NoError(err)
require.Len(roots, 2)
time.Sleep(2 * time.Second)
// Now the old root should be pruned.
_, roots, err = s1.fsm.State().CARoots(nil)
require.NoError(err)
require.Len(roots, 1)
require.True(roots[0].Active)
require.NotEqual(roots[0].ID, oldRoot.ID)
}
func TestLeader_PersistIntermediateCAs(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Get the current root
rootReq := &structs.DCSpecificRequest{
Datacenter: "dc1",
}
var rootList structs.IndexedCARoots
require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList))
require.Len(rootList.Roots, 1)
// Update the provider config to use a new private key, which should
// cause a rotation.
_, newKey, err := connect.GeneratePrivateKey()
require.NoError(err)
newConfig := &structs.CAConfiguration{
Provider: "consul",
Config: map[string]interface{}{
"PrivateKey": newKey,
"RootCert": "",
"RotationPeriod": 90 * 24 * time.Hour,
},
}
{
args := &structs.CARequest{
Datacenter: "dc1",
Config: newConfig,
}
var reply interface{}
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply))
}
// Get the active root before leader change.
_, root := s1.getCAProvider()
require.Len(root.IntermediateCerts, 1)
// Force a leader change and make sure the root CA values are preserved.
s1.Leave()
s1.Shutdown()
retry.Run(t, func(r *retry.R) {
var leader *Server
for _, s := range []*Server{s2, s3} {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
r.Fatal("no leader")
}
_, newLeaderRoot := leader.getCAProvider()
if !reflect.DeepEqual(newLeaderRoot, root) {
r.Fatalf("got %v, want %v", newLeaderRoot, root)
}
})
}
func TestLeader_ParseCARoot(t *testing.T) {
type test struct {
pem string
expectedError bool
}
tests := []test{
{"", true},
{`-----BEGIN CERTIFICATE-----
MIIDHDCCAsKgAwIBAgIQS+meruRVzrmVwEhXNrtk9jAKBggqhkjOPQQDAjCBuTEL
MAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2Nv
MRowGAYDVQQJExExMDEgU2Vjb25kIFN0cmVldDEOMAwGA1UEERMFOTQxMDUxFzAV
BgNVBAoTDkhhc2hpQ29ycCBJbmMuMUAwPgYDVQQDEzdDb25zdWwgQWdlbnQgQ0Eg
MTkzNzYxNzQwMjcxNzUxOTkyMzAyMzE1NDkxNjUzODYyMzAwNzE3MB4XDTE5MDQx
MjA5MTg0NVoXDTIwMDQxMTA5MTg0NVowHDEaMBgGA1UEAxMRY2xpZW50LmRjMS5j
b25zdWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS2UroGUh5k7eR//iPsn9ne
CMCVsERnjqQnK6eDWnM5kTXgXcPPe5pcAS9xs0g8BZ+oVsJSc7sH6RYvX+gw6bCl
o4IBRjCCAUIwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr
BgEFBQcDATAMBgNVHRMBAf8EAjAAMGgGA1UdDgRhBF84NDphNDplZjoxYTpjODo1
MzoxMDo1YTpjNTplYTpjZTphYTowZDo2ZjpjOTozODozZDphZjo0NTphZTo5OTo4
YzpiYjoyNzpiYzpiMzpmYTpmMDozMToxNDo4ZTozNDBqBgNVHSMEYzBhgF8yYTox
MjpjYTo0Mzo0NzowODpiZjoxYTo0Yjo4MTpkNDo2MzowNTo1ODowZToxYzo3Zjoy
NTo0ZjozNDpmNDozYjpmYzo5YTpkNzo4Mjo2YjpkYzpmODo3YjphMTo5ZDAtBgNV
HREEJjAkghFjbGllbnQuZGMxLmNvbnN1bIIJbG9jYWxob3N0hwR/AAABMAoGCCqG
SM49BAMCA0gAMEUCIHcLS74KSQ7RA+edwOprmkPTh1nolwXz9/y9CJ5nMVqEAiEA
h1IHCbxWsUT3AiARwj5/D/CUppy6BHIFkvcpOCQoVyo=
-----END CERTIFICATE-----`, false},
}
for _, test := range tests {
root, err := parseCARoot(test.pem, "consul", "cluster")
if err == nil && test.expectedError {
require.Error(t, err)
}
if test.pem != "" {
rootCert, err := connect.ParseCert(test.pem)
require.NoError(t, err)
// just to make sure these two are not the same
require.NotEqual(t, rootCert.AuthorityKeyId, rootCert.SubjectKeyId)
require.Equal(t, connect.HexString(rootCert.SubjectKeyId), root.SigningKeyID)
}
}
}

View File

@ -2,11 +2,9 @@ package consul
import (
"os"
"reflect"
"testing"
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
@ -1066,148 +1064,6 @@ func TestLeader_ACL_Initialization(t *testing.T) {
}
}
func TestLeader_CARootPruning(t *testing.T) {
t.Parallel()
caRootPruneInterval = 200 * time.Millisecond
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// Get the current root
rootReq := &structs.DCSpecificRequest{
Datacenter: "dc1",
}
var rootList structs.IndexedCARoots
require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList))
require.Len(rootList.Roots, 1)
oldRoot := rootList.Roots[0]
// Update the provider config to use a new private key, which should
// cause a rotation.
_, newKey, err := connect.GeneratePrivateKey()
require.NoError(err)
newConfig := &structs.CAConfiguration{
Provider: "consul",
Config: map[string]interface{}{
"LeafCertTTL": "500ms",
"PrivateKey": newKey,
"RootCert": "",
"RotationPeriod": "2160h",
"SkipValidate": true,
},
}
{
args := &structs.CARequest{
Datacenter: "dc1",
Config: newConfig,
}
var reply interface{}
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply))
}
// Should have 2 roots now.
_, roots, err := s1.fsm.State().CARoots(nil)
require.NoError(err)
require.Len(roots, 2)
time.Sleep(2 * time.Second)
// Now the old root should be pruned.
_, roots, err = s1.fsm.State().CARoots(nil)
require.NoError(err)
require.Len(roots, 1)
require.True(roots[0].Active)
require.NotEqual(roots[0].ID, oldRoot.ID)
}
func TestLeader_PersistIntermediateCAs(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Get the current root
rootReq := &structs.DCSpecificRequest{
Datacenter: "dc1",
}
var rootList structs.IndexedCARoots
require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList))
require.Len(rootList.Roots, 1)
// Update the provider config to use a new private key, which should
// cause a rotation.
_, newKey, err := connect.GeneratePrivateKey()
require.NoError(err)
newConfig := &structs.CAConfiguration{
Provider: "consul",
Config: map[string]interface{}{
"PrivateKey": newKey,
"RootCert": "",
"RotationPeriod": 90 * 24 * time.Hour,
},
}
{
args := &structs.CARequest{
Datacenter: "dc1",
Config: newConfig,
}
var reply interface{}
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply))
}
// Get the active root before leader change.
_, root := s1.getCAProvider()
require.Len(root.IntermediateCerts, 1)
// Force a leader change and make sure the root CA values are preserved.
s1.Leave()
s1.Shutdown()
retry.Run(t, func(r *retry.R) {
var leader *Server
for _, s := range []*Server{s2, s3} {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
r.Fatal("no leader")
}
_, newLeaderRoot := leader.getCAProvider()
if !reflect.DeepEqual(newLeaderRoot, root) {
r.Fatalf("got %v, want %v", newLeaderRoot, root)
}
})
}
func TestLeader_ACLUpgrade(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
@ -1302,47 +1158,3 @@ func TestLeader_ConfigEntryBootstrap(t *testing.T) {
require.Equal(t, global_entry_init.Config, global.Config)
})
}
func TestLeader_ParseCARoot(t *testing.T) {
type test struct {
pem string
expectedError bool
}
tests := []test{
{"", true},
{`-----BEGIN CERTIFICATE-----
MIIDHDCCAsKgAwIBAgIQS+meruRVzrmVwEhXNrtk9jAKBggqhkjOPQQDAjCBuTEL
MAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2Nv
MRowGAYDVQQJExExMDEgU2Vjb25kIFN0cmVldDEOMAwGA1UEERMFOTQxMDUxFzAV
BgNVBAoTDkhhc2hpQ29ycCBJbmMuMUAwPgYDVQQDEzdDb25zdWwgQWdlbnQgQ0Eg
MTkzNzYxNzQwMjcxNzUxOTkyMzAyMzE1NDkxNjUzODYyMzAwNzE3MB4XDTE5MDQx
MjA5MTg0NVoXDTIwMDQxMTA5MTg0NVowHDEaMBgGA1UEAxMRY2xpZW50LmRjMS5j
b25zdWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS2UroGUh5k7eR//iPsn9ne
CMCVsERnjqQnK6eDWnM5kTXgXcPPe5pcAS9xs0g8BZ+oVsJSc7sH6RYvX+gw6bCl
o4IBRjCCAUIwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr
BgEFBQcDATAMBgNVHRMBAf8EAjAAMGgGA1UdDgRhBF84NDphNDplZjoxYTpjODo1
MzoxMDo1YTpjNTplYTpjZTphYTowZDo2ZjpjOTozODozZDphZjo0NTphZTo5OTo4
YzpiYjoyNzpiYzpiMzpmYTpmMDozMToxNDo4ZTozNDBqBgNVHSMEYzBhgF8yYTox
MjpjYTo0Mzo0NzowODpiZjoxYTo0Yjo4MTpkNDo2MzowNTo1ODowZToxYzo3Zjoy
NTo0ZjozNDpmNDozYjpmYzo5YTpkNzo4Mjo2YjpkYzpmODo3YjphMTo5ZDAtBgNV
HREEJjAkghFjbGllbnQuZGMxLmNvbnN1bIIJbG9jYWxob3N0hwR/AAABMAoGCCqG
SM49BAMCA0gAMEUCIHcLS74KSQ7RA+edwOprmkPTh1nolwXz9/y9CJ5nMVqEAiEA
h1IHCbxWsUT3AiARwj5/D/CUppy6BHIFkvcpOCQoVyo=
-----END CERTIFICATE-----`, false},
}
for _, test := range tests {
root, err := parseCARoot(test.pem, "consul", "cluster")
if err == nil && test.expectedError {
require.Error(t, err)
}
if test.pem != "" {
rootCert, err := connect.ParseCert(test.pem)
require.NoError(t, err)
// just to make sure these two are not the same
require.NotEqual(t, rootCert.AuthorityKeyId, rootCert.SubjectKeyId)
require.Equal(t, connect.HexString(rootCert.SubjectKeyId), root.SigningKeyID)
}
}
}

View File

@ -142,12 +142,6 @@ type Server struct {
caProviderRoot *structs.CARoot
caProviderLock sync.RWMutex
// caPruningCh is used to shut down the CA root pruning goroutine when we
// lose leadership.
caPruningCh chan struct{}
caPruningLock sync.RWMutex
caPruningEnabled bool
// Consul configuration
config *Config

View File

@ -276,9 +276,33 @@ func runtimeStats() map[string]string {
// ServersMeetMinimumVersion returns whether the given alive servers are at least on the
// given Consul version
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool {
return ServersMeetRequirements(members, func(srv *metadata.Server) bool {
return srv.Status != serf.StatusAlive || !srv.Build.LessThan(minVersion)
})
}
// ServersMeetMinimumVersion returns whether the given alive servers from a particular
// datacenter are at least on the given Consul version. This requires at least 1 alive server in the DC
func ServersInDCMeetMinimumVersion(members []serf.Member, datacenter string, minVersion *version.Version) (bool, bool) {
found := false
ok := ServersMeetRequirements(members, func(srv *metadata.Server) bool {
if srv.Status != serf.StatusAlive || srv.Datacenter != datacenter {
return true
}
found = true
return !srv.Build.LessThan(minVersion)
})
return ok, found
}
// ServersMeetRequirements returns whether the given server members meet the requirements as defined by the
// callback function
func ServersMeetRequirements(members []serf.Member, meetsRequirements func(*metadata.Server) bool) bool {
for _, member := range members {
if valid, parts := metadata.IsConsulServer(member); valid && parts.Status == serf.StatusAlive {
if parts.Build.LessThan(minVersion) {
if valid, parts := metadata.IsConsulServer(member); valid {
if !meetsRequirements(parts) {
return false
}
}

View File

@ -405,6 +405,94 @@ func TestServersMeetMinimumVersion(t *testing.T) {
}
}
func TestServersInDCMeetMinimumVersion(t *testing.T) {
t.Parallel()
makeMember := func(version string, datacenter string) serf.Member {
return serf.Member{
Name: "foo",
Addr: net.IP([]byte{127, 0, 0, 1}),
Tags: map[string]string{
"role": "consul",
"id": "asdf",
"dc": datacenter,
"port": "10000",
"build": version,
"wan_join_port": "1234",
"vsn": "1",
"expect": "3",
"raft_vsn": "3",
},
Status: serf.StatusAlive,
}
}
cases := []struct {
members []serf.Member
ver *version.Version
expected bool
expectedFound bool
}{
// One server, meets reqs
{
members: []serf.Member{
makeMember("0.7.5", "primary"),
makeMember("0.7.3", "secondary"),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
expectedFound: true,
},
// One server, doesn't meet reqs
{
members: []serf.Member{
makeMember("0.7.5", "primary"),
makeMember("0.8.1", "secondary"),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
expectedFound: true,
},
// Multiple servers, meets req version
{
members: []serf.Member{
makeMember("0.7.5", "primary"),
makeMember("0.8.0", "primary"),
makeMember("0.7.0", "secondary"),
},
ver: version.Must(version.NewVersion("0.7.5")),
expected: true,
expectedFound: true,
},
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("0.7.5", "primary"),
makeMember("0.8.0", "primary"),
makeMember("0.9.1", "secondary"),
},
ver: version.Must(version.NewVersion("0.8.0")),
expected: false,
expectedFound: true,
},
{
members: []serf.Member{
makeMember("0.7.5", "secondary"),
makeMember("0.8.0", "secondary"),
makeMember("0.9.1", "secondary"),
},
ver: version.Must(version.NewVersion("0.7.0")),
expected: true,
expectedFound: false,
},
}
for _, tc := range cases {
result, found := ServersInDCMeetMinimumVersion(tc.members, "primary", tc.ver)
require.Equal(t, tc.expected, result)
require.Equal(t, tc.expectedFound, found)
}
}
func TestInterpolateHIL(t *testing.T) {
for _, test := range []struct {
name string