leader: move the virtual IP version check into a goroutine

This commit is contained in:
Kyle Havlovitz 2021-12-09 16:40:26 -08:00
parent 74eb257b1c
commit 04ef1c3fa0
4 changed files with 63 additions and 30 deletions

View File

@ -55,8 +55,6 @@ var (
// minCentralizedConfigVersion is the minimum Consul version in which centralized
// config is supported
minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0"))
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
)
// monitorLeadership is used to monitor if we acquire or lose our role
@ -188,10 +186,6 @@ RECONCILE:
s.logger.Error("failed to reconcile", "error", err)
goto WAIT
}
if err := s.setVirtualIPFlag(); err != nil {
s.logger.Error("failed to set virtual IP flag", "error", err)
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
@ -219,7 +213,6 @@ WAIT:
goto RECONCILE
case member := <-reconcileCh:
s.reconcileMember(member)
s.setVirtualIPFlag()
case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index)
case errCh := <-s.reassertLeaderCh:
@ -322,10 +315,6 @@ func (s *Server) establishLeadership(ctx context.Context) error {
return err
}
if err := s.setVirtualIPFlag(); err != nil {
return err
}
s.setConsistentReadReady()
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
@ -892,25 +881,6 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
return nil
}
func (s *Server) setVirtualIPFlag() error {
// Return early if the flag is already set.
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
if err != nil {
return err
}
if val != "" {
return nil
}
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
s.logger.Warn(fmt.Sprintf("can't allocate Virtual IPs until all servers >= %s",
minVirtualIPVersion.String()))
return nil
}
return s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true")
}
// reconcileReaped is used to reconcile nodes that have failed and been reaped
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
// We generate a "reap" event to cause the node to be cleaned up.

View File

@ -2,12 +2,14 @@ package consul
import (
"context"
"fmt"
"time"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-version"
)
const (
@ -24,6 +26,14 @@ var (
// maxRetryBackoff is the maximum number of seconds to wait between failed blocking
// queries when backing off.
maxRetryBackoff = 256
// minVirtualIPVersion is the minimum version for all Consul servers for virtual IP
// assignment to be enabled.
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
// virtualIPVersionCheckInterval is the frequency we check whether all servers meet
// the minimum version to enable virtual IP assignment for services.
virtualIPVersionCheckInterval = time.Minute
)
// startConnectLeader starts multi-dc connect leader routines.
@ -36,6 +46,7 @@ func (s *Server) startConnectLeader(ctx context.Context) error {
s.leaderRoutineManager.Start(ctx, caRootPruningRoutineName, s.runCARootPruning)
s.leaderRoutineManager.Start(ctx, caRootMetricRoutineName, rootCAExpiryMonitor(s).Monitor)
s.leaderRoutineManager.Start(ctx, caSigningMetricRoutineName, signingCAExpiryMonitor(s).Monitor)
s.leaderRoutineManager.Start(ctx, virtualIPCheckRoutineName, s.runVirtualIPVersionCheck)
return s.startIntentionConfigEntryMigration(ctx)
}
@ -47,6 +58,7 @@ func (s *Server) stopConnectLeader() {
s.leaderRoutineManager.Stop(caRootPruningRoutineName)
s.leaderRoutineManager.Stop(caRootMetricRoutineName)
s.leaderRoutineManager.Stop(caSigningMetricRoutineName)
s.leaderRoutineManager.Stop(virtualIPCheckRoutineName)
}
func (s *Server) runCARootPruning(ctx context.Context) error {
@ -111,6 +123,54 @@ func (s *Server) pruneCARoots() error {
return err
}
func (s *Server) runVirtualIPVersionCheck(ctx context.Context) error {
// Return early if the flag is already set.
_, err := s.setVirtualIPVersionFlag()
if err != nil {
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
}
ticker := time.NewTicker(virtualIPVersionCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
ok, err := s.setVirtualIPVersionFlag()
if err != nil {
s.loggers.Named(logging.Connect).Warn("error enabling virtual IPs", "error", err)
continue
}
if ok {
return nil
}
}
}
}
func (s *Server) setVirtualIPVersionFlag() (bool, error) {
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
if err != nil {
return false, err
}
if val != "" {
return true, nil
}
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
return false, fmt.Errorf("can't allocate Virtual IPs until all servers >= %s",
minVirtualIPVersion.String())
}
if err := s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true"); err != nil {
return false, nil
}
return true, nil
}
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
// upon errors up to a maximum of maxRetryBackoff seconds.
func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) {

View File

@ -2126,6 +2126,8 @@ func TestLeader_EnableVirtualIPs(t *testing.T) {
t.Skip("too slow for testing.Short")
}
virtualIPVersionCheckInterval = 50 * time.Millisecond
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3

View File

@ -116,6 +116,7 @@ const (
secondaryCARootWatchRoutineName = "secondary CA roots watch"
intermediateCertRenewWatchRoutineName = "intermediate cert renew watch"
backgroundCAInitializationRoutineName = "CA initialization"
virtualIPCheckRoutineName = "virtual IP version check"
)
var (