mirror of https://github.com/status-im/consul.git
Implement Leader Routine Management (#6580)
* Implement leader routine manager Switch over the following to use it for go routine management: • Config entry Replication • ACL replication - tokens, policies, roles and legacy tokens • ACL legacy token upgrade • ACL token reaping • Intention Replication • Secondary CA Roots Watching • CA Root Pruning Also added the StopAll call into the Server Shutdown method to ensure all leader routines get killed off when shutting down. This should be mostly unnecessary as `revokeLeadership` should manually stop each one but just in case we really want these to go away (eventually).
This commit is contained in:
parent
28221f66f2
commit
d65bbbfd4e
|
@ -9,33 +9,11 @@ import (
|
|||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func (s *Server) startACLTokenReaping() {
|
||||
s.aclTokenReapLock.Lock()
|
||||
defer s.aclTokenReapLock.Unlock()
|
||||
|
||||
if s.aclTokenReapEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.aclTokenReapCancel = cancel
|
||||
|
||||
// Do a quick check for config settings that would imply the goroutine
|
||||
// below will just spin forever.
|
||||
//
|
||||
// We can only check the config settings here that cannot change without a
|
||||
// restart, so we omit the check for a non-empty replication token as that
|
||||
// can be changed at runtime.
|
||||
if !s.InACLDatacenter() && !s.config.ACLTokenReplication {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
func (s *Server) reapExpiredTokens(ctx context.Context) error {
|
||||
limiter := rate.NewLimiter(aclTokenReapingRateLimit, aclTokenReapingBurst)
|
||||
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
if s.LocalTokensEnabled() {
|
||||
|
@ -49,22 +27,24 @@ func (s *Server) startACLTokenReaping() {
|
|||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.aclTokenReapEnabled = true
|
||||
}
|
||||
|
||||
func (s *Server) stopACLTokenReaping() {
|
||||
s.aclTokenReapLock.Lock()
|
||||
defer s.aclTokenReapLock.Unlock()
|
||||
|
||||
if !s.aclTokenReapEnabled {
|
||||
func (s *Server) startACLTokenReaping() {
|
||||
// Do a quick check for config settings that would imply the goroutine
|
||||
// below will just spin forever.
|
||||
//
|
||||
// We can only check the config settings here that cannot change without a
|
||||
// restart, so we omit the check for a non-empty replication token as that
|
||||
// can be changed at runtime.
|
||||
if !s.InACLDatacenter() && !s.config.ACLTokenReplication {
|
||||
return
|
||||
}
|
||||
|
||||
s.aclTokenReapCancel()
|
||||
s.aclTokenReapCancel = nil
|
||||
s.aclTokenReapEnabled = false
|
||||
s.leaderRoutineManager.Start(aclTokenReapingRoutineName, s.reapExpiredTokens)
|
||||
}
|
||||
|
||||
func (s *Server) stopACLTokenReaping() {
|
||||
s.leaderRoutineManager.Stop(aclTokenReapingRoutineName)
|
||||
}
|
||||
|
||||
func (s *Server) reapExpiredGlobalACLTokens() (int, error) {
|
||||
|
|
|
@ -649,22 +649,13 @@ func (s *Server) initializeACLs(upgrade bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) startACLUpgrade() {
|
||||
s.aclUpgradeLock.Lock()
|
||||
defer s.aclUpgradeLock.Unlock()
|
||||
|
||||
if s.aclUpgradeEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.aclUpgradeCancel = cancel
|
||||
|
||||
go func() {
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
|
||||
limiter := rate.NewLimiter(aclUpgradeRateLimit, int(aclUpgradeRateLimit))
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// actually run the upgrade here
|
||||
|
@ -733,43 +724,30 @@ func (s *Server) startACLUpgrade() {
|
|||
s.logger.Printf("[ERR] acl: failed to apply acl token upgrade batch: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
s.aclUpgradeEnabled = true
|
||||
func (s *Server) startACLUpgrade() {
|
||||
if s.config.PrimaryDatacenter != s.config.Datacenter {
|
||||
// token upgrades should only run in the primary
|
||||
return
|
||||
}
|
||||
|
||||
s.leaderRoutineManager.Start(aclUpgradeRoutineName, s.legacyACLTokenUpgrade)
|
||||
}
|
||||
|
||||
func (s *Server) stopACLUpgrade() {
|
||||
s.aclUpgradeLock.Lock()
|
||||
defer s.aclUpgradeLock.Unlock()
|
||||
|
||||
if !s.aclUpgradeEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.aclUpgradeCancel()
|
||||
s.aclUpgradeCancel = nil
|
||||
s.aclUpgradeEnabled = false
|
||||
s.leaderRoutineManager.Stop(aclUpgradeRoutineName)
|
||||
}
|
||||
|
||||
func (s *Server) startLegacyACLReplication() {
|
||||
s.aclReplicationLock.Lock()
|
||||
defer s.aclReplicationLock.Unlock()
|
||||
|
||||
if s.aclReplicationEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.initReplicationStatus()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.aclReplicationCancel = cancel
|
||||
|
||||
go func() {
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
func (s *Server) runLegacyACLReplication(ctx context.Context) error {
|
||||
var lastRemoteIndex uint64
|
||||
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
|
||||
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
if s.tokens.ReplicationToken() == "" {
|
||||
|
@ -778,7 +756,7 @@ func (s *Server) startLegacyACLReplication() {
|
|||
|
||||
index, exit, err := s.replicateLegacyACLs(lastRemoteIndex, ctx)
|
||||
if exit {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -791,48 +769,84 @@ func (s *Server) startLegacyACLReplication() {
|
|||
s.logger.Printf("[DEBUG] consul: Legacy ACL replication completed through remote index %d", index)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
|
||||
s.aclReplicationEnabled = true
|
||||
}
|
||||
|
||||
func (s *Server) startACLReplication() {
|
||||
s.aclReplicationLock.Lock()
|
||||
defer s.aclReplicationLock.Unlock()
|
||||
func (s *Server) startLegacyACLReplication() {
|
||||
if s.InACLDatacenter() {
|
||||
return
|
||||
}
|
||||
|
||||
if s.aclReplicationEnabled {
|
||||
// unlike some other leader routines this initializes some extra state
|
||||
// and therefore we want to prevent re-initialization if things are already
|
||||
// running
|
||||
if s.leaderRoutineManager.IsRunning(legacyACLReplicationRoutineName) {
|
||||
return
|
||||
}
|
||||
|
||||
s.initReplicationStatus()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.aclReplicationCancel = cancel
|
||||
|
||||
s.startACLReplicator(ctx, structs.ACLReplicatePolicies, s.replicateACLPolicies)
|
||||
s.startACLReplicator(ctx, structs.ACLReplicateRoles, s.replicateACLRoles)
|
||||
s.leaderRoutineManager.Start(legacyACLReplicationRoutineName, s.runLegacyACLReplication)
|
||||
s.logger.Printf("[INFO] acl: started legacy ACL replication")
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
|
||||
}
|
||||
|
||||
func (s *Server) startACLReplication() {
|
||||
if s.InACLDatacenter() {
|
||||
return
|
||||
}
|
||||
|
||||
// unlike some other leader routines this initializes some extra state
|
||||
// and therefore we want to prevent re-initialization if things are already
|
||||
// running
|
||||
if s.leaderRoutineManager.IsRunning(aclPolicyReplicationRoutineName) {
|
||||
return
|
||||
}
|
||||
|
||||
s.initReplicationStatus()
|
||||
s.leaderRoutineManager.Start(aclPolicyReplicationRoutineName, s.runACLPolicyReplicator)
|
||||
s.leaderRoutineManager.Start(aclRoleReplicationRoutineName, s.runACLRoleReplicator)
|
||||
|
||||
if s.config.ACLTokenReplication {
|
||||
s.startACLReplicator(ctx, structs.ACLReplicateTokens, s.replicateACLTokens)
|
||||
s.leaderRoutineManager.Start(aclTokenReplicationRoutineName, s.runACLTokenReplicator)
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicateTokens)
|
||||
} else {
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicatePolicies)
|
||||
}
|
||||
|
||||
s.aclReplicationEnabled = true
|
||||
}
|
||||
|
||||
type replicateFunc func(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error)
|
||||
|
||||
func (s *Server) startACLReplicator(ctx context.Context, replicationType structs.ACLReplicationType, replicateFunc replicateFunc) {
|
||||
go func() {
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
|
||||
s.logger.Printf("[INFO] acl: started ACL Policy replication")
|
||||
|
||||
return s.runACLReplicator(ctx, structs.ACLReplicatePolicies, s.replicateACLPolicies)
|
||||
}
|
||||
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
func (s *Server) runACLRoleReplicator(ctx context.Context) error {
|
||||
s.logger.Printf("[INFO] acl: started ACL Role replication")
|
||||
return s.runACLReplicator(ctx, structs.ACLReplicateRoles, s.replicateACLRoles)
|
||||
}
|
||||
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
func (s *Server) runACLTokenReplicator(ctx context.Context) error {
|
||||
return s.runACLReplicator(ctx, structs.ACLReplicateTokens, s.replicateACLTokens)
|
||||
}
|
||||
|
||||
// This function is only intended to be run as a managed go routine, it will block until
|
||||
// the context passed in indicates that it should exit.
|
||||
func (s *Server) runACLReplicator(ctx context.Context, replicationType structs.ACLReplicationType, replicateFunc replicateFunc) error {
|
||||
var failedAttempts uint
|
||||
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
|
||||
|
||||
var lastRemoteIndex uint64
|
||||
for {
|
||||
if err := limiter.Wait(ctx); err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
if s.tokens.ReplicationToken() == "" {
|
||||
|
@ -841,7 +855,7 @@ func (s *Server) startACLReplicator(ctx context.Context, replicationType structs
|
|||
|
||||
index, exit, err := replicateFunc(ctx, lastRemoteIndex)
|
||||
if exit {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -854,7 +868,7 @@ func (s *Server) startACLReplicator(ctx context.Context, replicationType structs
|
|||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
return nil
|
||||
case <-time.After((1 << failedAttempts) * time.Second):
|
||||
// do nothing
|
||||
}
|
||||
|
@ -865,23 +879,14 @@ func (s *Server) startACLReplicator(ctx context.Context, replicationType structs
|
|||
failedAttempts = 0
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
s.logger.Printf("[INFO] acl: started ACL %s replication", replicationType.SingularNoun())
|
||||
}
|
||||
|
||||
func (s *Server) stopACLReplication() {
|
||||
s.aclReplicationLock.Lock()
|
||||
defer s.aclReplicationLock.Unlock()
|
||||
|
||||
if !s.aclReplicationEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.aclReplicationCancel()
|
||||
s.aclReplicationCancel = nil
|
||||
s.updateACLReplicationStatusStopped()
|
||||
s.aclReplicationEnabled = false
|
||||
// these will be no-ops when not started
|
||||
s.leaderRoutineManager.Stop(legacyACLReplicationRoutineName)
|
||||
s.leaderRoutineManager.Stop(aclPolicyReplicationRoutineName)
|
||||
s.leaderRoutineManager.Stop(aclRoleReplicationRoutineName)
|
||||
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
|
||||
}
|
||||
|
||||
func (s *Server) startConfigReplication() {
|
||||
|
@ -890,12 +895,12 @@ func (s *Server) startConfigReplication() {
|
|||
return
|
||||
}
|
||||
|
||||
s.configReplicator.Start()
|
||||
s.leaderRoutineManager.Start(configReplicationRoutineName, s.configReplicator.Run)
|
||||
}
|
||||
|
||||
func (s *Server) stopConfigReplication() {
|
||||
// will be a no-op when not started
|
||||
s.configReplicator.Stop()
|
||||
s.leaderRoutineManager.Stop(configReplicationRoutineName)
|
||||
}
|
||||
|
||||
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary
|
||||
|
|
|
@ -439,52 +439,30 @@ func (s *Server) generateCASignRequest(csr string) *structs.CASignRequest {
|
|||
|
||||
// startConnectLeader starts multi-dc connect leader routines.
|
||||
func (s *Server) startConnectLeader() {
|
||||
s.connectLock.Lock()
|
||||
defer s.connectLock.Unlock()
|
||||
|
||||
if s.connectEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.connectCh = make(chan struct{})
|
||||
|
||||
// Start the Connect secondary DC actions if enabled.
|
||||
if s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter {
|
||||
go s.secondaryCARootWatch(s.connectCh)
|
||||
go s.replicateIntentions(s.connectCh)
|
||||
|
||||
s.leaderRoutineManager.Start(secondaryCARootWatchRoutineName, s.secondaryCARootWatch)
|
||||
s.leaderRoutineManager.Start(intentionReplicationRoutineName, s.replicateIntentions)
|
||||
}
|
||||
|
||||
go s.runCARootPruning(s.connectCh)
|
||||
|
||||
s.connectEnabled = true
|
||||
s.leaderRoutineManager.Start(caRootPruningRoutineName, s.runCARootPruning)
|
||||
}
|
||||
|
||||
// stopConnectLeader stops connect specific leader functions.
|
||||
func (s *Server) stopConnectLeader() {
|
||||
s.connectLock.Lock()
|
||||
defer s.connectLock.Unlock()
|
||||
|
||||
if !s.connectEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.actingSecondaryLock.Lock()
|
||||
s.actingSecondaryCA = false
|
||||
s.actingSecondaryLock.Unlock()
|
||||
|
||||
close(s.connectCh)
|
||||
s.connectEnabled = false
|
||||
s.leaderRoutineManager.Stop(secondaryCARootWatchRoutineName)
|
||||
s.leaderRoutineManager.Stop(intentionReplicationRoutineName)
|
||||
s.leaderRoutineManager.Stop(caRootPruningRoutineName)
|
||||
}
|
||||
|
||||
func (s *Server) runCARootPruning(stopCh <-chan struct{}) {
|
||||
func (s *Server) runCARootPruning(ctx context.Context) error {
|
||||
ticker := time.NewTicker(caRootPruneInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
if err := s.pruneCARoots(); err != nil {
|
||||
s.logger.Printf("[ERR] connect: error pruning CA roots: %v", err)
|
||||
|
@ -549,7 +527,7 @@ func (s *Server) pruneCARoots() error {
|
|||
// secondaryCARootWatch maintains a blocking query to the primary datacenter's
|
||||
// ConnectCA.Roots endpoint to monitor when it needs to request a new signed
|
||||
// intermediate certificate.
|
||||
func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
|
||||
func (s *Server) secondaryCARootWatch(ctx context.Context) error {
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
|
@ -559,7 +537,7 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
|
|||
|
||||
s.logger.Printf("[DEBUG] connect: starting Connect CA root replication from primary datacenter %q", s.config.PrimaryDatacenter)
|
||||
|
||||
retryLoopBackoff(stopCh, func() error {
|
||||
retryLoopBackoff(ctx.Done(), func() error {
|
||||
var roots structs.IndexedCARoots
|
||||
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
|
||||
return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err)
|
||||
|
@ -598,18 +576,20 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
|
|||
}, func(err error) {
|
||||
s.logger.Printf("[ERR] connect: %v", err)
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// replicateIntentions executes a blocking query to the primary datacenter to replicate
|
||||
// the intentions there to the local state.
|
||||
func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
|
||||
func (s *Server) replicateIntentions(ctx context.Context) error {
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
}
|
||||
|
||||
s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter)
|
||||
|
||||
retryLoopBackoff(stopCh, func() error {
|
||||
retryLoopBackoff(ctx.Done(), func() error {
|
||||
// Always use the latest replication token value in case it changed while looping.
|
||||
args.QueryOptions.Token = s.tokens.ReplicationToken()
|
||||
|
||||
|
@ -653,6 +633,7 @@ func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
|
|||
}, func(err error) {
|
||||
s.logger.Printf("[ERR] connect: error replicating intentions: %v", err)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type LeaderRoutine func(ctx context.Context) error
|
||||
|
||||
type leaderRoutine struct {
|
||||
running bool
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
type LeaderRoutineManager struct {
|
||||
lock sync.RWMutex
|
||||
logger *log.Logger
|
||||
|
||||
routines map[string]*leaderRoutine
|
||||
}
|
||||
|
||||
func NewLeaderRoutineManager(logger *log.Logger) *LeaderRoutineManager {
|
||||
if logger == nil {
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
}
|
||||
|
||||
return &LeaderRoutineManager{
|
||||
logger: logger,
|
||||
routines: make(map[string]*leaderRoutine),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) IsRunning(name string) bool {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
if routine, ok := m.routines[name]; ok {
|
||||
return routine.running
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) Start(name string, routine LeaderRoutine) error {
|
||||
return m.StartWithContext(nil, name, routine)
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name string, routine LeaderRoutine) error {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
if instance, ok := m.routines[name]; ok && instance.running {
|
||||
return nil
|
||||
}
|
||||
|
||||
if parentCtx == nil {
|
||||
parentCtx = context.Background()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
instance := &leaderRoutine{
|
||||
running: true,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := routine(ctx)
|
||||
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||||
m.logger.Printf("[ERROR] leader: %s routine exited with error: %v", name, err)
|
||||
} else {
|
||||
m.logger.Printf("[DEBUG] leader: stopped %s routine", name)
|
||||
}
|
||||
|
||||
m.lock.Lock()
|
||||
instance.running = false
|
||||
m.lock.Unlock()
|
||||
}()
|
||||
|
||||
m.routines[name] = instance
|
||||
m.logger.Printf("[INFO] leader: started %s routine", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) Stop(name string) error {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
instance, ok := m.routines[name]
|
||||
if !ok {
|
||||
// no running instance
|
||||
return nil
|
||||
}
|
||||
|
||||
if !instance.running {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.logger.Printf("[DEBUG] leader: stopping %s routine", name)
|
||||
instance.cancel()
|
||||
delete(m.routines, name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) StopAll() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
for name, routine := range m.routines {
|
||||
if !routine.running {
|
||||
continue
|
||||
}
|
||||
m.logger.Printf("[DEBUG] leader: stopping %s routine", name)
|
||||
routine.cancel()
|
||||
}
|
||||
|
||||
// just whipe out the entire map
|
||||
m.routines = make(map[string]*leaderRoutine)
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLeaderRoutineManager(t *testing.T) {
|
||||
t.Parallel()
|
||||
var runs uint32
|
||||
var running uint32
|
||||
// tlog := testutil.NewCancellableTestLogger(t)
|
||||
// defer tlog.Cancel()
|
||||
mgr := NewLeaderRoutineManager(testutil.TestLogger(t))
|
||||
|
||||
run := func(ctx context.Context) error {
|
||||
atomic.StoreUint32(&running, 1)
|
||||
defer atomic.StoreUint32(&running, 0)
|
||||
atomic.AddUint32(&runs, 1)
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning on unregistered service should be false
|
||||
require.False(t, mgr.IsRunning("not-found"))
|
||||
|
||||
// start
|
||||
require.NoError(t, mgr.Start("run", run))
|
||||
require.True(t, mgr.IsRunning("run"))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||
})
|
||||
require.NoError(t, mgr.Stop("run"))
|
||||
|
||||
// ensure the background go routine was actually cancelled
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
|
||||
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
|
||||
})
|
||||
|
||||
// restart and stop
|
||||
require.NoError(t, mgr.Start("run", run))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(2), atomic.LoadUint32(&runs))
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||
})
|
||||
|
||||
require.NoError(t, mgr.Stop("run"))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
|
||||
})
|
||||
|
||||
// start with a context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
require.NoError(t, mgr.StartWithContext(ctx, "run", run))
|
||||
cancel()
|
||||
|
||||
// The function should exit of its own accord due to the parent
|
||||
// context being canceled
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(3), atomic.LoadUint32(&runs))
|
||||
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
|
||||
// the task should automatically set itself to not running if
|
||||
// it exits early
|
||||
require.False(r, mgr.IsRunning("run"))
|
||||
})
|
||||
}
|
|
@ -5,7 +5,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
|
@ -41,14 +41,11 @@ type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64) (index uin
|
|||
|
||||
type Replicator struct {
|
||||
name string
|
||||
lock sync.RWMutex
|
||||
running bool
|
||||
cancel context.CancelFunc
|
||||
ctx context.Context
|
||||
limiter *rate.Limiter
|
||||
waiter *lib.RetryWaiter
|
||||
replicate ReplicatorFunc
|
||||
replicateFn ReplicatorFunc
|
||||
logger *log.Logger
|
||||
lastRemoteIndex uint64
|
||||
}
|
||||
|
||||
func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
||||
|
@ -75,63 +72,44 @@ func NewReplicator(config *ReplicatorConfig) (*Replicator, error) {
|
|||
waiter := lib.NewRetryWaiter(minFailures, 0*time.Second, maxWait, lib.NewJitterRandomStagger(10))
|
||||
return &Replicator{
|
||||
name: config.Name,
|
||||
running: false,
|
||||
limiter: limiter,
|
||||
waiter: waiter,
|
||||
replicate: config.ReplicateFn,
|
||||
replicateFn: config.ReplicateFn,
|
||||
logger: config.Logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *Replicator) Start() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if r.running {
|
||||
return
|
||||
}
|
||||
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
|
||||
go r.run()
|
||||
|
||||
r.running = true
|
||||
r.logger.Printf("[INFO] replication: started %s replication", r.name)
|
||||
}
|
||||
|
||||
func (r *Replicator) run() {
|
||||
var lastRemoteIndex uint64
|
||||
|
||||
func (r *Replicator) Run(ctx context.Context) error {
|
||||
defer r.logger.Printf("[INFO] replication: stopped %s replication", r.name)
|
||||
|
||||
for {
|
||||
// This ensures we aren't doing too many successful replication rounds - mostly useful when
|
||||
// the data within the primary datacenter is changing rapidly but we try to limit the amount
|
||||
// of resources replication into the secondary datacenter should take
|
||||
if err := r.limiter.Wait(r.ctx); err != nil {
|
||||
return
|
||||
if err := r.limiter.Wait(ctx); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Perform a single round of replication
|
||||
index, exit, err := r.replicate(r.ctx, lastRemoteIndex)
|
||||
index, exit, err := r.replicateFn(ctx, atomic.LoadUint64(&r.lastRemoteIndex))
|
||||
if exit {
|
||||
// the replication function told us to exit
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
|
||||
// the next round of replication
|
||||
lastRemoteIndex = 0
|
||||
atomic.StoreUint64(&r.lastRemoteIndex, 0)
|
||||
r.logger.Printf("[WARN] replication: %s replication error (will retry if still leader): %v", r.name, err)
|
||||
} else {
|
||||
lastRemoteIndex = index
|
||||
atomic.StoreUint64(&r.lastRemoteIndex, index)
|
||||
r.logger.Printf("[DEBUG] replication: %s replication completed through remote index %d", r.name, index)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
// wait some amount of time to prevent churning through many replication rounds while replication is failing
|
||||
case <-r.waiter.WaitIfErr(err):
|
||||
// do nothing
|
||||
|
@ -139,16 +117,6 @@ func (r *Replicator) run() {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Replicator) Stop() {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
if !r.running {
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Printf("[DEBUG] replication: stopping %s replication", r.name)
|
||||
r.cancel()
|
||||
r.cancel = nil
|
||||
r.running = false
|
||||
func (r *Replicator) Index() uint64 {
|
||||
return atomic.LoadUint64(&r.lastRemoteIndex)
|
||||
}
|
||||
|
|
|
@ -4,15 +4,19 @@ import (
|
|||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestReplicationRestart(t *testing.T) {
|
||||
mgr := NewLeaderRoutineManager(testutil.TestLogger(t))
|
||||
|
||||
config := ReplicatorConfig{
|
||||
Name: "mock",
|
||||
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64) (uint64, bool, error) {
|
||||
return 1, false, nil
|
||||
},
|
||||
|
||||
Rate: 1,
|
||||
Burst: 1,
|
||||
}
|
||||
|
@ -20,9 +24,9 @@ func TestReplicationRestart(t *testing.T) {
|
|||
repl, err := NewReplicator(&config)
|
||||
require.NoError(t, err)
|
||||
|
||||
repl.Start()
|
||||
repl.Stop()
|
||||
repl.Start()
|
||||
mgr.Start("mock", repl.Run)
|
||||
mgr.Stop("mock")
|
||||
mgr.Start("mock", repl.Run)
|
||||
// Previously this would have segfaulted
|
||||
repl.Stop()
|
||||
mgr.Stop("mock")
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -88,6 +87,19 @@ const (
|
|||
reconcileChSize = 256
|
||||
)
|
||||
|
||||
const (
|
||||
legacyACLReplicationRoutineName = "legacy ACL replication"
|
||||
aclPolicyReplicationRoutineName = "ACL policy replication"
|
||||
aclRoleReplicationRoutineName = "ACL role replication"
|
||||
aclTokenReplicationRoutineName = "ACL token replication"
|
||||
aclTokenReapingRoutineName = "acl token reaping"
|
||||
aclUpgradeRoutineName = "legacy ACL token upgrade"
|
||||
caRootPruningRoutineName = "CA root pruning"
|
||||
configReplicationRoutineName = "config entry replication"
|
||||
intentionReplicationRoutineName = "intention replication"
|
||||
secondaryCARootWatchRoutineName = "secondary CA roots watch"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled")
|
||||
)
|
||||
|
@ -101,24 +113,6 @@ type Server struct {
|
|||
// acls is used to resolve tokens to effective policies
|
||||
acls *ACLResolver
|
||||
|
||||
// aclUpgradeCancel is used to cancel the ACL upgrade goroutine when we
|
||||
// lose leadership
|
||||
aclUpgradeCancel context.CancelFunc
|
||||
aclUpgradeLock sync.RWMutex
|
||||
aclUpgradeEnabled bool
|
||||
|
||||
// aclReplicationCancel is used to shut down the ACL replication goroutine
|
||||
// when we lose leadership
|
||||
aclReplicationCancel context.CancelFunc
|
||||
aclReplicationLock sync.RWMutex
|
||||
aclReplicationEnabled bool
|
||||
|
||||
// aclTokenReapCancel is used to shut down the ACL Token expiration reap
|
||||
// goroutine when we lose leadership.
|
||||
aclTokenReapCancel context.CancelFunc
|
||||
aclTokenReapLock sync.RWMutex
|
||||
aclTokenReapEnabled bool
|
||||
|
||||
aclAuthMethodValidators map[string]*authMethodValidatorEntry
|
||||
aclAuthMethodValidatorLock sync.RWMutex
|
||||
|
||||
|
@ -271,15 +265,13 @@ type Server struct {
|
|||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
|
||||
// State for multi-dc connect leader logic
|
||||
connectLock sync.RWMutex
|
||||
connectEnabled bool
|
||||
connectCh chan struct{}
|
||||
|
||||
// State for whether this datacenter is acting as a secondary CA.
|
||||
actingSecondaryCA bool
|
||||
actingSecondaryLock sync.RWMutex
|
||||
|
||||
// Manager to handle starting/stopping go routines when establishing/revoking raft leadership
|
||||
leaderRoutineManager *LeaderRoutineManager
|
||||
|
||||
// embedded struct to hold all the enterprise specific data
|
||||
EnterpriseServer
|
||||
}
|
||||
|
@ -372,6 +364,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store, tl
|
|||
tombstoneGC: gc,
|
||||
serverLookup: NewServerLookup(),
|
||||
shutdownCh: shutdownCh,
|
||||
leaderRoutineManager: NewLeaderRoutineManager(logger),
|
||||
}
|
||||
|
||||
// Initialize enterprise specific server functionality
|
||||
|
@ -812,6 +805,11 @@ func (s *Server) Shutdown() error {
|
|||
s.shutdown = true
|
||||
close(s.shutdownCh)
|
||||
|
||||
// ensure that any leader routines still running get canceled
|
||||
if s.leaderRoutineManager != nil {
|
||||
s.leaderRoutineManager.StopAll()
|
||||
}
|
||||
|
||||
if s.serfLAN != nil {
|
||||
s.serfLAN.Shutdown()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue