mirror of https://github.com/status-im/consul.git
Move some things around to allow for license updating via config reload
The bulk of this commit is moving the LeaderRoutineManager from the agent/consul package into its own package: lib/gort. It also got a renaming and its Start method now requires a context. Requiring that context required updating a whole bunch of other places in the code.
This commit is contained in:
parent
3f33ac36c5
commit
f054099e84
|
@ -0,0 +1,2 @@
|
|||
```release-note:improvement
|
||||
licensing: **(Enterprise Only)** Consul Enterprise has gained the ability update its license via a configuration reload. The same environment variables and configurations will be used to determine the new license.```
|
|
@ -49,6 +49,7 @@ import (
|
|||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/file"
|
||||
"github.com/hashicorp/consul/lib/mutex"
|
||||
"github.com/hashicorp/consul/lib/routine"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
"github.com/hashicorp/consul/types"
|
||||
|
@ -327,6 +328,10 @@ type Agent struct {
|
|||
// into Agent, which will allow us to remove this field.
|
||||
rpcClientHealth *health.Client
|
||||
|
||||
// routineManager is responsible for managing longer running go routines
|
||||
// run by the Agent
|
||||
routineManager *routine.Manager
|
||||
|
||||
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
|
||||
enterpriseAgent
|
||||
}
|
||||
|
@ -371,6 +376,7 @@ func New(bd BaseDeps) (*Agent, error) {
|
|||
tlsConfigurator: bd.TLSConfigurator,
|
||||
config: bd.RuntimeConfig,
|
||||
cache: bd.Cache,
|
||||
routineManager: routine.NewManager(bd.Logger),
|
||||
}
|
||||
|
||||
// TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
|
||||
|
|
|
@ -290,6 +290,12 @@ type Config struct {
|
|||
SegmentName *string `mapstructure:"segment"`
|
||||
// Enterprise Only
|
||||
Segments []Segment `mapstructure:"segments"`
|
||||
|
||||
// Enterprise Only - not user configurable
|
||||
LicensePollBaseTime *string `mapstructure:"license_poll_base_time"`
|
||||
LicensePollMaxTime *string `mapstructure:"license_poll_max_time"`
|
||||
LicenseUpdateBaseTime *string `mapstructure:"license_update_base_time"`
|
||||
LicenseUpdateMaxTime *string `mapstructure:"license_update_max_time"`
|
||||
}
|
||||
|
||||
type GossipLANConfig struct {
|
||||
|
|
|
@ -30,7 +30,7 @@ func (s *Server) reapExpiredTokens(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) startACLTokenReaping() {
|
||||
func (s *Server) startACLTokenReaping(ctx context.Context) {
|
||||
// Do a quick check for config settings that would imply the goroutine
|
||||
// below will just spin forever.
|
||||
//
|
||||
|
@ -41,7 +41,7 @@ func (s *Server) startACLTokenReaping() {
|
|||
return
|
||||
}
|
||||
|
||||
s.leaderRoutineManager.Start(aclTokenReapingRoutineName, s.reapExpiredTokens)
|
||||
s.leaderRoutineManager.Start(ctx, aclTokenReapingRoutineName, s.reapExpiredTokens)
|
||||
}
|
||||
|
||||
func (s *Server) stopACLTokenReaping() {
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"strings"
|
||||
|
@ -46,7 +47,7 @@ func (s *Server) handleEnterpriseLeave() {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *Server) establishEnterpriseLeadership() error {
|
||||
func (s *Server) establishEnterpriseLeadership(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -115,7 +115,7 @@ func (s *Server) monitorLeadership() {
|
|||
|
||||
if canUpgrade := s.canUpgradeToNewACLs(weAreLeaderCh != nil); canUpgrade {
|
||||
if weAreLeaderCh != nil {
|
||||
if err := s.initializeACLs(true); err != nil {
|
||||
if err := s.initializeACLs(&lib.StopChannelContext{StopCh: weAreLeaderCh}, true); err != nil {
|
||||
s.logger.Error("error transitioning to using new ACLs", "error", err)
|
||||
continue
|
||||
}
|
||||
|
@ -308,12 +308,12 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
|||
// check for the upgrade here - this helps us transition to new ACLs much
|
||||
// quicker if this is a new cluster or this is a test agent
|
||||
if canUpgrade := s.canUpgradeToNewACLs(true); canUpgrade {
|
||||
if err := s.initializeACLs(true); err != nil {
|
||||
if err := s.initializeACLs(ctx, true); err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.StoreInt32(&s.useNewACLs, 1)
|
||||
s.updateACLAdvertisement()
|
||||
} else if err := s.initializeACLs(false); err != nil {
|
||||
} else if err := s.initializeACLs(ctx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -337,20 +337,20 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := s.establishEnterpriseLeadership(); err != nil {
|
||||
if err := s.establishEnterpriseLeadership(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.getOrCreateAutopilotConfig()
|
||||
s.autopilot.Start(ctx)
|
||||
|
||||
s.startConfigReplication()
|
||||
s.startConfigReplication(ctx)
|
||||
|
||||
s.startFederationStateReplication()
|
||||
s.startFederationStateReplication(ctx)
|
||||
|
||||
s.startFederationStateAntiEntropy()
|
||||
s.startFederationStateAntiEntropy(ctx)
|
||||
|
||||
if err := s.startConnectLeader(); err != nil {
|
||||
if err := s.startConnectLeader(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -499,7 +499,7 @@ func (s *Server) initializeLegacyACL() error {
|
|||
|
||||
// initializeACLs is used to setup the ACLs if we are the leader
|
||||
// and need to do this.
|
||||
func (s *Server) initializeACLs(upgrade bool) error {
|
||||
func (s *Server) initializeACLs(ctx context.Context, upgrade bool) error {
|
||||
if !s.config.ACLsEnabled {
|
||||
return nil
|
||||
}
|
||||
|
@ -673,11 +673,11 @@ func (s *Server) initializeACLs(upgrade bool) error {
|
|||
}
|
||||
}
|
||||
// launch the upgrade go routine to generate accessors for everything
|
||||
s.startACLUpgrade()
|
||||
s.startACLUpgrade(ctx)
|
||||
} else {
|
||||
if s.UseLegacyACLs() && !upgrade {
|
||||
if s.IsACLReplicationEnabled() {
|
||||
s.startLegacyACLReplication()
|
||||
s.startLegacyACLReplication(ctx)
|
||||
}
|
||||
// return early as we don't want to start new ACL replication
|
||||
// or ACL token reaping as these are new ACL features.
|
||||
|
@ -689,10 +689,10 @@ func (s *Server) initializeACLs(upgrade bool) error {
|
|||
}
|
||||
|
||||
// ACL replication is now mandatory
|
||||
s.startACLReplication()
|
||||
s.startACLReplication(ctx)
|
||||
}
|
||||
|
||||
s.startACLTokenReaping()
|
||||
s.startACLTokenReaping(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -771,13 +771,13 @@ func (s *Server) legacyACLTokenUpgrade(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) startACLUpgrade() {
|
||||
func (s *Server) startACLUpgrade(ctx context.Context) {
|
||||
if s.config.PrimaryDatacenter != s.config.Datacenter {
|
||||
// token upgrades should only run in the primary
|
||||
return
|
||||
}
|
||||
|
||||
s.leaderRoutineManager.Start(aclUpgradeRoutineName, s.legacyACLTokenUpgrade)
|
||||
s.leaderRoutineManager.Start(ctx, aclUpgradeRoutineName, s.legacyACLTokenUpgrade)
|
||||
}
|
||||
|
||||
func (s *Server) stopACLUpgrade() {
|
||||
|
@ -826,7 +826,7 @@ func (s *Server) runLegacyACLReplication(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) startLegacyACLReplication() {
|
||||
func (s *Server) startLegacyACLReplication(ctx context.Context) {
|
||||
if s.InACLDatacenter() {
|
||||
return
|
||||
}
|
||||
|
@ -840,12 +840,12 @@ func (s *Server) startLegacyACLReplication() {
|
|||
|
||||
s.initReplicationStatus()
|
||||
|
||||
s.leaderRoutineManager.Start(legacyACLReplicationRoutineName, s.runLegacyACLReplication)
|
||||
s.leaderRoutineManager.Start(ctx, legacyACLReplicationRoutineName, s.runLegacyACLReplication)
|
||||
s.logger.Info("started legacy ACL replication")
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicateLegacy)
|
||||
}
|
||||
|
||||
func (s *Server) startACLReplication() {
|
||||
func (s *Server) startACLReplication(ctx context.Context) {
|
||||
if s.InACLDatacenter() {
|
||||
return
|
||||
}
|
||||
|
@ -858,11 +858,11 @@ func (s *Server) startACLReplication() {
|
|||
}
|
||||
|
||||
s.initReplicationStatus()
|
||||
s.leaderRoutineManager.Start(aclPolicyReplicationRoutineName, s.runACLPolicyReplicator)
|
||||
s.leaderRoutineManager.Start(aclRoleReplicationRoutineName, s.runACLRoleReplicator)
|
||||
s.leaderRoutineManager.Start(ctx, aclPolicyReplicationRoutineName, s.runACLPolicyReplicator)
|
||||
s.leaderRoutineManager.Start(ctx, aclRoleReplicationRoutineName, s.runACLRoleReplicator)
|
||||
|
||||
if s.config.ACLTokenReplication {
|
||||
s.leaderRoutineManager.Start(aclTokenReplicationRoutineName, s.runACLTokenReplicator)
|
||||
s.leaderRoutineManager.Start(ctx, aclTokenReplicationRoutineName, s.runACLTokenReplicator)
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicateTokens)
|
||||
} else {
|
||||
s.updateACLReplicationStatusRunning(structs.ACLReplicatePolicies)
|
||||
|
@ -973,13 +973,13 @@ func (s *Server) stopACLReplication() {
|
|||
s.leaderRoutineManager.Stop(aclTokenReplicationRoutineName)
|
||||
}
|
||||
|
||||
func (s *Server) startConfigReplication() {
|
||||
func (s *Server) startConfigReplication(ctx context.Context) {
|
||||
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
|
||||
// replication shouldn't run in the primary DC
|
||||
return
|
||||
}
|
||||
|
||||
s.leaderRoutineManager.Start(configReplicationRoutineName, s.configReplicator.Run)
|
||||
s.leaderRoutineManager.Start(ctx, configReplicationRoutineName, s.configReplicator.Run)
|
||||
}
|
||||
|
||||
func (s *Server) stopConfigReplication() {
|
||||
|
@ -987,7 +987,7 @@ func (s *Server) stopConfigReplication() {
|
|||
s.leaderRoutineManager.Stop(configReplicationRoutineName)
|
||||
}
|
||||
|
||||
func (s *Server) startFederationStateReplication() {
|
||||
func (s *Server) startFederationStateReplication(ctx context.Context) {
|
||||
if s.config.PrimaryDatacenter == "" || s.config.PrimaryDatacenter == s.config.Datacenter {
|
||||
// replication shouldn't run in the primary DC
|
||||
return
|
||||
|
@ -998,7 +998,7 @@ func (s *Server) startFederationStateReplication() {
|
|||
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
|
||||
}
|
||||
|
||||
s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run)
|
||||
s.leaderRoutineManager.Start(ctx, federationStateReplicationRoutineName, s.federationStateReplicator.Run)
|
||||
}
|
||||
|
||||
func (s *Server) stopFederationStateReplication() {
|
||||
|
|
|
@ -29,15 +29,15 @@ var (
|
|||
)
|
||||
|
||||
// startConnectLeader starts multi-dc connect leader routines.
|
||||
func (s *Server) startConnectLeader() error {
|
||||
func (s *Server) startConnectLeader(ctx context.Context) error {
|
||||
if !s.config.ConnectEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.caManager.Start()
|
||||
s.leaderRoutineManager.Start(caRootPruningRoutineName, s.runCARootPruning)
|
||||
s.caManager.Start(ctx)
|
||||
s.leaderRoutineManager.Start(ctx, caRootPruningRoutineName, s.runCARootPruning)
|
||||
|
||||
return s.startIntentionConfigEntryMigration()
|
||||
return s.startIntentionConfigEntryMigration(ctx)
|
||||
}
|
||||
|
||||
// stopConnectLeader stops connect specific leader functions.
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/routine"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
@ -65,7 +66,7 @@ type CAManager struct {
|
|||
primaryRoots structs.IndexedCARoots // The most recently seen state of the root CAs from the primary datacenter.
|
||||
actingSecondaryCA bool // True if this datacenter has been initialized as a secondary CA.
|
||||
|
||||
leaderRoutineManager *LeaderRoutineManager
|
||||
leaderRoutineManager *routine.Manager
|
||||
}
|
||||
|
||||
type caDelegateWithState struct {
|
||||
|
@ -76,7 +77,7 @@ func (c *caDelegateWithState) State() *state.Store {
|
|||
return c.fsm.State()
|
||||
}
|
||||
|
||||
func NewCAManager(delegate caServerDelegate, leaderRoutineManager *LeaderRoutineManager, logger hclog.Logger, config *Config) *CAManager {
|
||||
func NewCAManager(delegate caServerDelegate, leaderRoutineManager *routine.Manager, logger hclog.Logger, config *Config) *CAManager {
|
||||
return &CAManager{
|
||||
delegate: delegate,
|
||||
logger: logger,
|
||||
|
@ -247,7 +248,7 @@ func (c *CAManager) setCAProvider(newProvider ca.Provider, root *structs.CARoot)
|
|||
c.providerLock.Unlock()
|
||||
}
|
||||
|
||||
func (c *CAManager) Start() {
|
||||
func (c *CAManager) Start(ctx context.Context) {
|
||||
// Attempt to initialize the Connect CA now. This will
|
||||
// happen during leader establishment and it would be great
|
||||
// if the CA was ready to go once that process was finished.
|
||||
|
@ -257,11 +258,11 @@ func (c *CAManager) Start() {
|
|||
// we failed to fully initialize the CA so we need to spawn a
|
||||
// go routine to retry this process until it succeeds or we lose
|
||||
// leadership and the go routine gets stopped.
|
||||
c.leaderRoutineManager.Start(backgroundCAInitializationRoutineName, c.backgroundCAInitialization)
|
||||
c.leaderRoutineManager.Start(ctx, backgroundCAInitializationRoutineName, c.backgroundCAInitialization)
|
||||
} else {
|
||||
// We only start these if CA initialization was successful. If not the completion of the
|
||||
// background CA initialization will start these routines.
|
||||
c.startPostInitializeRoutines()
|
||||
c.startPostInitializeRoutines(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -271,13 +272,13 @@ func (c *CAManager) Stop() {
|
|||
c.leaderRoutineManager.Stop(backgroundCAInitializationRoutineName)
|
||||
}
|
||||
|
||||
func (c *CAManager) startPostInitializeRoutines() {
|
||||
func (c *CAManager) startPostInitializeRoutines(ctx context.Context) {
|
||||
// Start the Connect secondary DC actions if enabled.
|
||||
if c.serverConf.Datacenter != c.serverConf.PrimaryDatacenter {
|
||||
c.leaderRoutineManager.Start(secondaryCARootWatchRoutineName, c.secondaryCARootWatch)
|
||||
c.leaderRoutineManager.Start(ctx, secondaryCARootWatchRoutineName, c.secondaryCARootWatch)
|
||||
}
|
||||
|
||||
c.leaderRoutineManager.Start(intermediateCertRenewWatchRoutineName, c.intermediateCertRenewalWatch)
|
||||
c.leaderRoutineManager.Start(ctx, intermediateCertRenewWatchRoutineName, c.intermediateCertRenewalWatch)
|
||||
}
|
||||
|
||||
func (c *CAManager) backgroundCAInitialization(ctx context.Context) error {
|
||||
|
@ -294,7 +295,7 @@ func (c *CAManager) backgroundCAInitialization(ctx context.Context) error {
|
|||
|
||||
c.logger.Info("Successfully initialized the Connect CA")
|
||||
|
||||
c.startPostInitializeRoutines()
|
||||
c.startPostInitializeRoutines(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ const (
|
|||
federationStatePruneInterval = time.Hour
|
||||
)
|
||||
|
||||
func (s *Server) startFederationStateAntiEntropy() {
|
||||
func (s *Server) startFederationStateAntiEntropy(ctx context.Context) {
|
||||
// Check to see if we can skip waiting for serf feature detection below.
|
||||
if !s.DatacenterSupportsFederationStates() {
|
||||
_, fedStates, err := s.fsm.State().FederationStateList(nil)
|
||||
|
@ -31,12 +31,12 @@ func (s *Server) startFederationStateAntiEntropy() {
|
|||
if s.config.DisableFederationStateAntiEntropy {
|
||||
return
|
||||
}
|
||||
s.leaderRoutineManager.Start(federationStateAntiEntropyRoutineName, s.federationStateAntiEntropySync)
|
||||
s.leaderRoutineManager.Start(ctx, federationStateAntiEntropyRoutineName, s.federationStateAntiEntropySync)
|
||||
|
||||
// If this is the primary, then also prune any stale datacenters from the
|
||||
// list of federation states.
|
||||
if s.config.PrimaryDatacenter == s.config.Datacenter {
|
||||
s.leaderRoutineManager.Start(federationStatePruningRoutineName, s.federationStatePruning)
|
||||
s.leaderRoutineManager.Start(ctx, federationStatePruningRoutineName, s.federationStatePruning)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ const (
|
|||
maxIntentionTxnSize = raftWarnSize / 4
|
||||
)
|
||||
|
||||
func (s *Server) startIntentionConfigEntryMigration() error {
|
||||
func (s *Server) startIntentionConfigEntryMigration(ctx context.Context) error {
|
||||
if !s.config.ConnectEnabled {
|
||||
return nil
|
||||
}
|
||||
|
@ -56,14 +56,14 @@ func (s *Server) startIntentionConfigEntryMigration() error {
|
|||
}
|
||||
|
||||
// When running in the primary we do all of the real work.
|
||||
s.leaderRoutineManager.Start(intentionMigrationRoutineName, s.legacyIntentionMigration)
|
||||
s.leaderRoutineManager.Start(ctx, intentionMigrationRoutineName, s.legacyIntentionMigration)
|
||||
} else {
|
||||
// When running in the secondary we mostly just wait until the
|
||||
// primary finishes, and then wait until we're pretty sure the main
|
||||
// config entry replication thread has seen all of the
|
||||
// migration-related config entry edits before zeroing OUR copy of
|
||||
// the old intentions table.
|
||||
s.leaderRoutineManager.Start(intentionMigrationRoutineName, s.legacyIntentionMigrationInSecondaryDC)
|
||||
s.leaderRoutineManager.Start(ctx, intentionMigrationRoutineName, s.legacyIntentionMigrationInSecondaryDC)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/lib/routine"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
@ -12,7 +13,7 @@ import (
|
|||
)
|
||||
|
||||
func TestReplicationRestart(t *testing.T) {
|
||||
mgr := NewLeaderRoutineManager(testutil.Logger(t))
|
||||
mgr := routine.NewManager(testutil.Logger(t))
|
||||
|
||||
config := ReplicatorConfig{
|
||||
Name: "mock",
|
||||
|
@ -30,9 +31,9 @@ func TestReplicationRestart(t *testing.T) {
|
|||
repl, err := NewReplicator(&config)
|
||||
require.NoError(t, err)
|
||||
|
||||
mgr.Start("mock", repl.Run)
|
||||
mgr.Start(context.Background(), "mock", repl.Run)
|
||||
mgr.Stop("mock")
|
||||
mgr.Start("mock", repl.Run)
|
||||
mgr.Start(context.Background(), "mock", repl.Run)
|
||||
// Previously this would have segfaulted
|
||||
mgr.Stop("mock")
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/lib/routine"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
|
@ -298,7 +299,7 @@ type Server struct {
|
|||
dcSupportsIntentionsAsConfigEntries int32
|
||||
|
||||
// Manager to handle starting/stopping go routines when establishing/revoking raft leadership
|
||||
leaderRoutineManager *LeaderRoutineManager
|
||||
leaderRoutineManager *routine.Manager
|
||||
|
||||
// embedded struct to hold all the enterprise specific data
|
||||
EnterpriseServer
|
||||
|
@ -375,7 +376,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
tombstoneGC: gc,
|
||||
serverLookup: NewServerLookup(),
|
||||
shutdownCh: shutdownCh,
|
||||
leaderRoutineManager: NewLeaderRoutineManager(logger),
|
||||
leaderRoutineManager: routine.NewManager(logger.Named(logging.Leader)),
|
||||
aclAuthMethodValidators: authmethod.NewCache(),
|
||||
fsm: newFSMFromConfig(flat.Logger, gc, config),
|
||||
}
|
||||
|
@ -1319,6 +1320,10 @@ func (s *Server) RegisterEndpoint(name string, handler interface{}) error {
|
|||
return s.rpcServer.RegisterName(name, handler)
|
||||
}
|
||||
|
||||
func (s *Server) FSM() *fsm.FSM {
|
||||
return s.fsm
|
||||
}
|
||||
|
||||
// Stats is used to return statistics for debugging and insight
|
||||
// for various sub-systems
|
||||
func (s *Server) Stats() map[string]map[string]string {
|
||||
|
|
|
@ -125,7 +125,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
TLSConfigurator: d.TLSConfigurator,
|
||||
Cache: d.Cache,
|
||||
Tokens: d.Tokens,
|
||||
EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps),
|
||||
EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps, cfg),
|
||||
}
|
||||
|
||||
d.AutoConfig, err = autoconf.New(acConf)
|
||||
|
|
|
@ -15,6 +15,6 @@ func initEnterpriseBaseDeps(d BaseDeps, _ *config.RuntimeConfig) (BaseDeps, erro
|
|||
}
|
||||
|
||||
// initEnterpriseAutoConfig is responsible for setting up auto-config for enterprise
|
||||
func initEnterpriseAutoConfig(_ consul.EnterpriseDeps) autoconf.EnterpriseConfig {
|
||||
func initEnterpriseAutoConfig(_ consul.EnterpriseDeps, _ *config.RuntimeConfig) autoconf.EnterpriseConfig {
|
||||
return autoconf.EnterpriseConfig{}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,9 @@ type TestAgent struct {
|
|||
// The caller is responsible for calling Shutdown() to stop the agent and remove
|
||||
// temporary directories.
|
||||
func NewTestAgent(t *testing.T, hcl string) *TestAgent {
|
||||
return StartTestAgent(t, TestAgent{HCL: hcl})
|
||||
a := StartTestAgent(t, TestAgent{HCL: hcl})
|
||||
t.Cleanup(func() { a.Shutdown() })
|
||||
return a
|
||||
}
|
||||
|
||||
// StartTestAgent and wait for it to become available. If the agent fails to
|
||||
|
|
|
@ -1,22 +1,21 @@
|
|||
package consul
|
||||
package routine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
type LeaderRoutine func(ctx context.Context) error
|
||||
type Routine func(ctx context.Context) error
|
||||
|
||||
type leaderRoutine struct {
|
||||
type routineTracker struct {
|
||||
cancel context.CancelFunc
|
||||
stoppedCh chan struct{} // closed when no longer running
|
||||
}
|
||||
|
||||
func (r *leaderRoutine) running() bool {
|
||||
func (r *routineTracker) running() bool {
|
||||
select {
|
||||
case <-r.stoppedCh:
|
||||
return false
|
||||
|
@ -25,27 +24,27 @@ func (r *leaderRoutine) running() bool {
|
|||
}
|
||||
}
|
||||
|
||||
type LeaderRoutineManager struct {
|
||||
type Manager struct {
|
||||
lock sync.RWMutex
|
||||
logger hclog.Logger
|
||||
|
||||
routines map[string]*leaderRoutine
|
||||
routines map[string]*routineTracker
|
||||
}
|
||||
|
||||
func NewLeaderRoutineManager(logger hclog.Logger) *LeaderRoutineManager {
|
||||
func NewManager(logger hclog.Logger) *Manager {
|
||||
if logger == nil {
|
||||
logger = hclog.New(&hclog.LoggerOptions{
|
||||
Output: os.Stderr,
|
||||
})
|
||||
}
|
||||
|
||||
return &LeaderRoutineManager{
|
||||
logger: logger.Named(logging.Leader),
|
||||
routines: make(map[string]*leaderRoutine),
|
||||
return &Manager{
|
||||
logger: logger,
|
||||
routines: make(map[string]*routineTracker),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) IsRunning(name string) bool {
|
||||
func (m *Manager) IsRunning(name string) bool {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
|
@ -56,11 +55,7 @@ func (m *LeaderRoutineManager) IsRunning(name string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) Start(name string, routine LeaderRoutine) error {
|
||||
return m.StartWithContext(context.TODO(), name, routine)
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name string, routine LeaderRoutine) error {
|
||||
func (m *Manager) Start(ctx context.Context, name string, routine Routine) error {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
|
@ -68,38 +63,41 @@ func (m *LeaderRoutineManager) StartWithContext(parentCtx context.Context, name
|
|||
return nil
|
||||
}
|
||||
|
||||
if parentCtx == nil {
|
||||
parentCtx = context.Background()
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
instance := &leaderRoutine{
|
||||
rtCtx, cancel := context.WithCancel(ctx)
|
||||
instance := &routineTracker{
|
||||
cancel: cancel,
|
||||
stoppedCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
close(instance.stoppedCh)
|
||||
}()
|
||||
|
||||
err := routine(ctx)
|
||||
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||||
m.logger.Error("routine exited with error",
|
||||
"routine", name,
|
||||
"error", err,
|
||||
)
|
||||
} else {
|
||||
m.logger.Debug("stopped routine", "routine", name)
|
||||
}
|
||||
}()
|
||||
go m.execute(rtCtx, name, routine, instance.stoppedCh)
|
||||
|
||||
m.routines[name] = instance
|
||||
m.logger.Info("started routine", "routine", name)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} {
|
||||
// execute will run the given routine in the foreground and close the given channel when its done executing
|
||||
func (m *Manager) execute(ctx context.Context, name string, routine Routine, done chan struct{}) {
|
||||
defer func() {
|
||||
close(done)
|
||||
}()
|
||||
|
||||
err := routine(ctx)
|
||||
if err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||||
m.logger.Error("routine exited with error",
|
||||
"routine", name,
|
||||
"error", err,
|
||||
)
|
||||
} else {
|
||||
m.logger.Debug("stopped routine", "routine", name)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) Stop(name string) <-chan struct{} {
|
||||
instance := m.stopInstance(name)
|
||||
if instance == nil {
|
||||
// Fabricate a closed channel so it won't block forever.
|
||||
|
@ -111,7 +109,7 @@ func (m *LeaderRoutineManager) Stop(name string) <-chan struct{} {
|
|||
return instance.stoppedCh
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine {
|
||||
func (m *Manager) stopInstance(name string) *routineTracker {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
|
@ -133,7 +131,7 @@ func (m *LeaderRoutineManager) stopInstance(name string) *leaderRoutine {
|
|||
return instance
|
||||
}
|
||||
|
||||
func (m *LeaderRoutineManager) StopAll() {
|
||||
func (m *Manager) StopAll() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
|
||||
|
@ -146,5 +144,5 @@ func (m *LeaderRoutineManager) StopAll() {
|
|||
}
|
||||
|
||||
// just wipe out the entire map
|
||||
m.routines = make(map[string]*leaderRoutine)
|
||||
m.routines = make(map[string]*routineTracker)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package consul
|
||||
package routine
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -10,13 +10,11 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLeaderRoutineManager(t *testing.T) {
|
||||
func TestManager(t *testing.T) {
|
||||
t.Parallel()
|
||||
var runs uint32
|
||||
var running uint32
|
||||
// tlog := testutil.NewCancellableTestLogger(t)
|
||||
// defer tlog.Cancel()
|
||||
mgr := NewLeaderRoutineManager(testutil.Logger(t))
|
||||
mgr := NewManager(testutil.Logger(t))
|
||||
|
||||
run := func(ctx context.Context) error {
|
||||
atomic.StoreUint32(&running, 1)
|
||||
|
@ -30,7 +28,7 @@ func TestLeaderRoutineManager(t *testing.T) {
|
|||
require.False(t, mgr.IsRunning("not-found"))
|
||||
|
||||
// start
|
||||
require.NoError(t, mgr.Start("run", run))
|
||||
require.NoError(t, mgr.Start(context.Background(), "run", run))
|
||||
require.True(t, mgr.IsRunning("run"))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
|
||||
|
@ -47,7 +45,7 @@ func TestLeaderRoutineManager(t *testing.T) {
|
|||
})
|
||||
|
||||
// restart and stop
|
||||
require.NoError(t, mgr.Start("run", run))
|
||||
require.NoError(t, mgr.Start(context.Background(), "run", run))
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.Equal(r, uint32(2), atomic.LoadUint32(&runs))
|
||||
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
|
||||
|
@ -63,7 +61,7 @@ func TestLeaderRoutineManager(t *testing.T) {
|
|||
|
||||
// start with a context
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
require.NoError(t, mgr.StartWithContext(ctx, "run", run))
|
||||
require.NoError(t, mgr.Start(ctx, "run", run))
|
||||
cancel()
|
||||
|
||||
// The function should exit of its own accord due to the parent
|
||||
|
@ -76,3 +74,28 @@ func TestLeaderRoutineManager(t *testing.T) {
|
|||
require.False(r, mgr.IsRunning("run"))
|
||||
})
|
||||
}
|
||||
|
||||
func TestManager_StopAll(t *testing.T) {
|
||||
t.Parallel()
|
||||
var runs uint32
|
||||
var running uint32
|
||||
mgr := NewManager(testutil.Logger(t))
|
||||
|
||||
run := func(ctx context.Context) error {
|
||||
atomic.StoreUint32(&running, 1)
|
||||
defer atomic.StoreUint32(&running, 0)
|
||||
atomic.AddUint32(&runs, 1)
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
require.NoError(t, mgr.Start(context.Background(), "run1", run))
|
||||
require.NoError(t, mgr.Start(context.Background(), "run2", run))
|
||||
|
||||
mgr.StopAll()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
require.False(r, mgr.IsRunning("run1"))
|
||||
require.False(r, mgr.IsRunning("run2"))
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue