diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 6572d815bc..b055f7aa17 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -36,11 +36,6 @@ func scaleFactor(nodes int) int { return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0) } -type State interface { - SyncChanges() error - SyncFull() error -} - // StateSyncer manages background synchronization of the given state. // // The state is synchronized on a regular basis or on demand when either @@ -49,45 +44,34 @@ type State interface { // The regular state sychronization provides a self-healing mechanism // for the cluster which is also called anti-entropy. type StateSyncer struct { + // paused flags whether sync runs are temporarily disabled. + // Must be the first element due to a go bug. + // todo(fs): which bug? Is this still relevant? + paused int32 + // State contains the data that needs to be synchronized. - State State + State interface { + SyncChanges() error + SyncFull() error + } // Interval is the time between two regular sync runs. Interval time.Duration + // ClusterSize returns the number of members in the cluster to + // allow staggering the sync runs based on cluster size. + ClusterSize func() int + // ShutdownCh is closed when the application is shutting down. ShutdownCh chan struct{} - // Logger is the logger. + // ServerUpCh contains data when a new consul server has been added to the cluster. + ServerUpCh chan struct{} + + // TriggerCh contains data when a sync should run immediately. + TriggerCh chan struct{} + Logger *log.Logger - - // ClusterSize returns the number of members in the cluster to - // allow staggering the sync runs based on cluster size. - // This needs to be set before Run() is called. - ClusterSize func() int - - // SyncFull allows triggering an immediate but staggered full sync - // in a non-blocking way. - SyncFull *Trigger - - // SyncChanges allows triggering an immediate partial sync - // in a non-blocking way. - SyncChanges *Trigger - - // paused stores whether sync runs are temporarily disabled. - paused *toggle -} - -func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { - return &StateSyncer{ - State: state, - Interval: intv, - ShutdownCh: shutdownCh, - Logger: logger, - SyncFull: NewTrigger(), - SyncChanges: NewTrigger(), - paused: new(toggle), - } } const ( @@ -102,10 +86,6 @@ const ( // Run is the long running method to perform state synchronization // between local and remote servers. func (s *StateSyncer) Run() { - if s.ClusterSize == nil { - panic("ClusterSize not set") - } - stagger := func(d time.Duration) time.Duration { f := scaleFactor(s.ClusterSize()) return lib.RandomStagger(time.Duration(f) * d) @@ -113,18 +93,20 @@ func (s *StateSyncer) Run() { FullSync: for { - // attempt a full sync - if err := s.State.SyncFull(); err != nil { + switch err := s.State.SyncFull(); { + + // full sync failed + case err != nil: s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) // retry full sync after some time or when a consul // server was added. select { - // trigger a full sync immediately. - // this is usually called when a consul server was added to the cluster. - // stagger the delay to avoid a thundering herd. - case <-s.SyncFull.Notif(): + // consul server added to cluster. + // retry sooner than retryFailIntv to converge cluster sooner + // but stagger delay to avoid thundering herd + case <-s.ServerUpCh: select { case <-time.After(stagger(serverUpIntv)): case <-s.ShutdownCh: @@ -139,38 +121,36 @@ FullSync: return } - continue - } + // full sync OK + default: - // do partial syncs until it is time for a full sync again - for { - select { - // trigger a full sync immediately - // this is usually called when a consul server was added to the cluster. - // stagger the delay to avoid a thundering herd. - case <-s.SyncFull.Notif(): + // do partial syncs until it is time for a full sync again + for { select { - case <-time.After(stagger(serverUpIntv)): + // todo(fs): why don't we honor the ServerUpCh here as well? + // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) + // case <-s.ServerUpCh: + // select { + // case <-time.After(stagger(serverUpIntv)): + // continue Sync + // case <-s.ShutdownCh: + // return + // } + + case <-time.After(s.Interval + stagger(s.Interval)): continue FullSync + + case <-s.TriggerCh: + if s.Paused() { + continue + } + if err := s.State.SyncChanges(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } + case <-s.ShutdownCh: return } - - // time for a full sync again - case <-time.After(s.Interval + stagger(s.Interval)): - continue FullSync - - // do partial syncs on demand - case <-s.SyncChanges.Notif(): - if s.Paused() { - continue - } - if err := s.State.SyncChanges(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) - } - - case <-s.ShutdownCh: - return } } } @@ -178,38 +158,27 @@ FullSync: // Pause temporarily disables sync runs. func (s *StateSyncer) Pause() { - s.paused.On() + atomic.AddInt32(&s.paused, 1) } // Paused returns whether sync runs are temporarily disabled. func (s *StateSyncer) Paused() bool { - return s.paused.IsOn() + return atomic.LoadInt32(&s.paused) > 0 } // Resume re-enables sync runs. func (s *StateSyncer) Resume() { - s.paused.Off() - s.SyncChanges.Trigger() + paused := atomic.AddInt32(&s.paused, -1) + if paused < 0 { + panic("unbalanced StateSyncer.Resume() detected") + } + s.triggerSync() } -// toggle implements an on/off switch using methods from the atomic -// package. Since fields in structs that are accessed via -// atomic.Load/Add methods need to be aligned properly on some platforms -// we move that code into a separate struct. -// -// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details -type toggle int32 - -func (p *toggle) On() { - atomic.AddInt32((*int32)(p), 1) -} - -func (p *toggle) Off() { - if atomic.AddInt32((*int32)(p), -1) < 0 { - panic("toggle not on") +// triggerSync queues a sync run if one has not been triggered already. +func (s *StateSyncer) triggerSync() { + select { + case s.TriggerCh <- struct{}{}: + default: } } - -func (p *toggle) IsOn() bool { - return atomic.LoadInt32((*int32)(p)) > 0 -} diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go index 3d3449d8b1..7246e7e24a 100644 --- a/agent/ae/ae_test.go +++ b/agent/ae/ae_test.go @@ -27,7 +27,7 @@ func TestAE_scaleFactor(t *testing.T) { func TestAE_nestedPauseResume(t *testing.T) { t.Parallel() - l := NewStateSyner(nil, 0, nil, nil) + l := new(StateSyncer) if l.Paused() != false { t.Fatal("syncer should be unPaused after init") } diff --git a/agent/ae/trigger.go b/agent/ae/trigger.go deleted file mode 100644 index 1aa5f45862..0000000000 --- a/agent/ae/trigger.go +++ /dev/null @@ -1,23 +0,0 @@ -package ae - -// Trigger implements a non-blocking event notifier. Events can be -// triggered without blocking and notifications happen only when the -// previous event was consumed. -type Trigger struct { - ch chan struct{} -} - -func NewTrigger() *Trigger { - return &Trigger{make(chan struct{}, 1)} -} - -func (t Trigger) Trigger() { - select { - case t.ch <- struct{}{}: - default: - } -} - -func (t Trigger) Notif() <-chan struct{} { - return t.ch -} diff --git a/agent/agent.go b/agent/agent.go index d08307609d..d800002e9f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -263,12 +263,27 @@ func (a *Agent) Start() error { return fmt.Errorf("Failed to setup node ID: %v", err) } + // create a notif channel to trigger state sychronizations + // when a consul server was added to the cluster. + serverUpCh := make(chan struct{}, 1) + + // create a notif channel to trigger state synchronizations + // when the state has changed. + triggerCh := make(chan struct{}, 1) + // create the local state - a.State = local.NewState(LocalConfig(c), a.logger, a.tokens) + a.State = local.NewState(LocalConfig(c), a.logger, a.tokens, triggerCh) // create the state synchronization manager which performs // regular and on-demand state synchronizations (anti-entropy). - a.sync = ae.NewStateSyner(a.State, c.AEInterval, a.shutdownCh, a.logger) + a.sync = &ae.StateSyncer{ + State: a.State, + Interval: c.AEInterval, + ShutdownCh: a.shutdownCh, + ServerUpCh: serverUpCh, + TriggerCh: triggerCh, + Logger: a.logger, + } // create the config for the rpc server/client consulCfg, err := a.consulConfig() @@ -279,7 +294,13 @@ func (a *Agent) Start() error { // ServerUp is used to inform that a new consul server is now // up. This can be used to speed up the sync process if we are blocking // waiting to discover a consul server - consulCfg.ServerUp = a.sync.SyncFull.Trigger + // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer + consulCfg.ServerUp = func() { + select { + case serverUpCh <- struct{}{}: + default: + } + } // Setup either the client or the server. if c.ServerMode { @@ -287,25 +308,21 @@ func (a *Agent) Start() error { if err != nil { return fmt.Errorf("Failed to start Consul server: %v", err) } + a.delegate = server + a.State.SetDelegate(server) + a.sync.ClusterSize = func() int { return len(server.LANMembers()) } } else { client, err := consul.NewClientLogger(consulCfg, a.logger) if err != nil { return fmt.Errorf("Failed to start Consul client: %v", err) } + a.delegate = client + a.State.SetDelegate(client) + a.sync.ClusterSize = func() int { return len(client.LANMembers()) } } - // the staggering of the state syncing depends on the cluster size. - a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) } - - // link the state with the consul server/client and the state syncer - // via callbacks. After several attempts this was easier than using - // channels since the event notification needs to be non-blocking - // and that should be hidden in the state syncer implementation. - a.State.Delegate = a.delegate - a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger - // Load checks/services/metadata. if err := a.loadServices(c); err != nil { return err @@ -1299,7 +1316,7 @@ func (a *Agent) WANMembers() []serf.Member { // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { go a.sync.Run() - a.logger.Printf("[INFO] agent: started state syncer") + a.logger.Printf("[INFO] agent: starting state syncer") } // PauseSync is used to pause anti-entropy while bulk changes are make @@ -2156,7 +2173,8 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig) error { return nil } -// unloadServices will deregister all services. +// unloadServices will deregister all services other than the 'consul' service +// known to the local agent. func (a *Agent) unloadServices() error { for id := range a.State.Services() { if err := a.RemoveService(id, false); err != nil { diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 55fd2536ce..d735d3b51a 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -1271,7 +1271,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) { EnableTagOverride: true, } - if got, want := a.State.Service("test"), svc; !verify.Values(t, "", got, want) { + if got, want := a.state.Services()["test"], svc; !verify.Values(t, "", got, want) { t.Fail() } } diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index 8459291173..fa73dbeee8 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -12,6 +12,40 @@ import ( "github.com/hashicorp/serf/coordinate" ) +func TestCatalogRegister(t *testing.T) { + t.Skip("skipping since it is not clear what this test is supposed to verify") + + t.Parallel() + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + // Register node + args := &structs.RegisterRequest{ + Node: "foo", + Address: "127.0.0.1", + } + req, _ := http.NewRequest("PUT", "/v1/catalog/register", jsonReader(args)) + obj, err := a.srv.CatalogRegister(nil, req) + if err != nil { + t.Fatalf("err: %v", err) + } + res := obj.(bool) + if res != true { + t.Fatalf("bad: %v", res) + } + + if err := a.State.SyncChanges(); err != nil { + t.Fatal("sync failed: ", err) + } + s := a.State.ServiceState("foo") + if s == nil { + t.Fatal("service 'foo' missing") + } + if !s.InSync { + t.Fatalf("service 'foo' should be in sync") + } +} + func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { t.Parallel() a := NewTestAgent(t.Name(), "") diff --git a/agent/local/state.go b/agent/local/state.go index 91bfc032dd..b980c569f5 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -18,6 +18,9 @@ import ( "github.com/hashicorp/consul/types" ) +// permissionDenied is returned when an ACL based rejection happens. +const permissionDenied = "Permission denied" + // Config is the configuration for the State. It is // populated during NewLocalAgent from the agent configuration to avoid // race conditions with the agent configuration. @@ -36,8 +39,7 @@ type ServiceState struct { // Service is the local copy of the service record. Service *structs.NodeService - // Token is the ACL to update or delete the service record on the - // server. + // Token is the ACL to update the service record on the server. Token string // InSync contains whether the local state of the service record @@ -62,8 +64,8 @@ type CheckState struct { // Check is the local copy of the health check record. Check *structs.HealthCheck - // Token is the ACL record to update or delete the health check - // record on the server. + // Token is the ACL record to update the health check record + // on the server. Token string // CriticalTime is the last time the health check status went @@ -72,8 +74,8 @@ type CheckState struct { CriticalTime time.Time // DeferCheck is used to delay the sync of a health check when - // only the output has changed. This rate limits changes which - // do not affect the state of the node and/or service. + // only the status has changed. + // todo(fs): ^^ this needs double checking... DeferCheck *time.Timer // InSync contains whether the local state of the health check @@ -105,7 +107,7 @@ func (c *CheckState) CriticalFor() time.Duration { return time.Since(c.CriticalTime) } -type rpc interface { +type delegate interface { RPC(method string, args interface{}, reply interface{}) error } @@ -114,25 +116,14 @@ type rpc interface { // catalog representation type State struct { sync.RWMutex - - // Delegate the RPC interface to the consul server or agent. - // - // It is set after both the state and the consul server/agent have - // been created. - Delegate rpc - - // TriggerSyncChanges is used to notify the state syncer that a - // partial sync should be performed. - // - // It is set after both the state and the state syncer have been - // created. - TriggerSyncChanges func() - logger *log.Logger // Config is the agent config config Config + // delegate is the consul interface to use for keeping in sync + delegate delegate + // nodeInfoInSync tracks whether the server has our correct top-level // node information in sync nodeInfoInSync bool @@ -143,9 +134,13 @@ type State struct { // Checks tracks the local checks checks map[types.CheckID]*CheckState - // metadata tracks the node metadata fields + // metadata tracks the local metadata fields metadata map[string]string + // triggerCh is used to inform of a change to local state + // that requires anti-entropy with the server + triggerCh chan struct{} + // discardCheckOutput stores whether the output of health checks // is stored in the raft log. discardCheckOutput atomic.Value // bool @@ -155,19 +150,33 @@ type State struct { } // NewLocalState creates a is used to initialize the local state -func NewState(c Config, lg *log.Logger, tokens *token.Store) *State { +func NewState(c Config, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *State { l := &State{ - config: c, - logger: lg, - services: make(map[string]*ServiceState), - checks: make(map[types.CheckID]*CheckState), - metadata: make(map[string]string), - tokens: tokens, + config: c, + logger: lg, + services: make(map[string]*ServiceState), + checks: make(map[types.CheckID]*CheckState), + metadata: make(map[string]string), + triggerCh: triggerCh, + tokens: tokens, } - l.SetDiscardCheckOutput(c.DiscardCheckOutput) + l.discardCheckOutput.Store(c.DiscardCheckOutput) return l } +func (l *State) SetDelegate(d delegate) { + l.delegate = d +} + +// changeMade is used to trigger an anti-entropy run +func (l *State) changeMade() { + // todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer + select { + case l.triggerCh <- struct{}{}: + default: + } +} + func (l *State) SetDiscardCheckOutput(b bool) { l.discardCheckOutput.Store(b) } @@ -195,12 +204,14 @@ func (l *State) serviceToken(id string) string { // AddService is used to add a service entry to the local state. // This entry is persistent and the agent will make a best effort to // ensure it is registered +// todo(fs): where is the persistence happening? func (l *State) AddService(service *structs.NodeService, token string) error { if service == nil { return fmt.Errorf("no service") } // use the service name as id if the id was omitted + // todo(fs): is this for backwards compatibility? if service.ID == "" { service.ID = service.Service } @@ -217,7 +228,7 @@ func (l *State) AddServiceState(s *ServiceState) { defer l.Unlock() l.services[s.Service.ID] = s - l.TriggerSyncChanges() + l.changeMade() } // RemoveService is used to remove a service entry from the local state. @@ -236,7 +247,7 @@ func (l *State) RemoveService(id string) error { // entry around until it is actually removed. s.InSync = false s.Deleted = true - l.TriggerSyncChanges() + l.changeMade() return nil } @@ -355,7 +366,7 @@ func (l *State) AddCheckState(c *CheckState) { defer l.Unlock() l.checks[c.Check.CheckID] = c - l.TriggerSyncChanges() + l.changeMade() } // RemoveCheck is used to remove a health check from the local state. @@ -376,7 +387,7 @@ func (l *State) RemoveCheck(id types.CheckID) error { // entry around until it is actually removed. c.InSync = false c.Deleted = true - l.TriggerSyncChanges() + l.changeMade() return nil } @@ -432,7 +443,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { return } c.InSync = false - l.TriggerSyncChanges() + l.changeMade() }) } return @@ -442,7 +453,7 @@ func (l *State) UpdateCheck(id types.CheckID, status, output string) { c.Check.Status = status c.Check.Output = output c.InSync = false - l.TriggerSyncChanges() + l.changeMade() } // Check returns the locally registered check that the @@ -538,12 +549,12 @@ func (l *State) updateSyncState() error { } var out1 structs.IndexedNodeServices - if err := l.Delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil { + if err := l.delegate.RPC("Catalog.NodeServices", &req, &out1); err != nil { return err } var out2 structs.IndexedHealthChecks - if err := l.Delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { + if err := l.delegate.RPC("Health.NodeChecks", &req, &out2); err != nil { return err } @@ -594,7 +605,8 @@ func (l *State) updateSyncState() error { continue } - // If the service is already scheduled for removal skip it + // If the service is scheduled for removal skip it. + // todo(fs): is this correct? if ls.Deleted { continue } @@ -634,7 +646,8 @@ func (l *State) updateSyncState() error { continue } - // If the check is already scheduled for removal skip it. + // If the check is scheduled for removal skip it. + // todo(fs): is this correct? if lc.Deleted { continue } @@ -674,13 +687,10 @@ func (l *State) updateSyncState() error { func (l *State) SyncFull() error { // note that we do not acquire the lock here since the methods // we are calling will do that themself. - // - // Also note that we don't hold the lock for the entire operation - // but release it between the two calls. This is not an issue since - // the algorithm is best-effort to achieve eventual consistency. - // SyncChanges will sync whatever updateSyncState() has determined - // needs updating. + // todo(fs): is it an issue that we do not hold the lock for the entire time? + // todo(fs): IMO, this doesn't matter since SyncChanges will sync whatever + // todo(fs): was determined in the update step. if err := l.updateSyncState(); err != nil { return err } @@ -754,7 +764,7 @@ func (l *State) LoadMetadata(data map[string]string) error { for k, v := range data { l.metadata[k] = v } - l.TriggerSyncChanges() + l.changeMade() return nil } @@ -805,24 +815,19 @@ func (l *State) deleteService(id string) error { WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)}, } var out struct{} - err := l.Delegate.RPC("Catalog.Deregister", &req, &out) - switch { - case err == nil || strings.Contains(err.Error(), "Unknown service"): + err := l.delegate.RPC("Catalog.Deregister", &req, &out) + if err == nil || strings.Contains(err.Error(), "Unknown service") { delete(l.services, id) - l.logger.Printf("[INFO] agent: Deregistered service %q", id) + l.logger.Printf("[INFO] agent: Deregistered service '%s'", id) return nil - - case acl.IsErrPermissionDenied(err): - // todo(fs): mark the service to be in sync to prevent excessive retrying before next full sync - // todo(fs): some backoff strategy might be a better solution - l.services[id].InSync = true - l.logger.Printf("[WARN] agent: Service %q deregistration blocked by ACLs", id) - return nil - - default: - l.logger.Printf("[WARN] agent: Deregistering service %q failed. %s", id, err) - return err } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the service in sync here? + l.services[id].InSync = true + l.logger.Printf("[WARN] agent: Service '%s' deregistration blocked by ACLs", id) + return nil + } + return err } // deleteCheck is used to delete a check from the server @@ -838,28 +843,20 @@ func (l *State) deleteCheck(id types.CheckID) error { WriteRequest: structs.WriteRequest{Token: l.checkToken(id)}, } var out struct{} - err := l.Delegate.RPC("Catalog.Deregister", &req, &out) - switch { - case err == nil || strings.Contains(err.Error(), "Unknown check"): - c := l.checks[id] - if c != nil && c.DeferCheck != nil { - c.DeferCheck.Stop() - } + err := l.delegate.RPC("Catalog.Deregister", &req, &out) + if err == nil || strings.Contains(err.Error(), "Unknown check") { + // todo(fs): do we need to stop the deferCheck timer here? delete(l.checks, id) - l.logger.Printf("[INFO] agent: Deregistered check %q", id) + l.logger.Printf("[INFO] agent: Deregistered check '%s'", id) return nil - - case acl.IsErrPermissionDenied(err): - // todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync - // todo(fs): some backoff strategy might be a better solution - l.checks[id].InSync = true - l.logger.Printf("[WARN] agent: Check %q deregistration blocked by ACLs", id) - return nil - - default: - l.logger.Printf("[WARN] agent: Deregistering check %q failed. %s", id, err) - return err } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the check in sync here? + l.checks[id].InSync = true + l.logger.Printf("[WARN] agent: Check '%s' deregistration blocked by ACLs", id) + return nil + } + return err } // syncService is used to sync a service to the server @@ -903,9 +900,8 @@ func (l *State) syncService(id string) error { } var out struct{} - err := l.Delegate.RPC("Catalog.Register", &req, &out) - switch { - case err == nil: + err := l.delegate.RPC("Catalog.Register", &req, &out) + if err == nil { l.services[id].InSync = true // Given how the register API works, this info is also updated // every time we sync a service. @@ -913,23 +909,20 @@ func (l *State) syncService(id string) error { for _, check := range checks { l.checks[check.CheckID].InSync = true } - l.logger.Printf("[INFO] agent: Synced service %q", id) + l.logger.Printf("[INFO] agent: Synced service '%s'", id) return nil - - case acl.IsErrPermissionDenied(err): - // todo(fs): mark the service and the checks to be in sync to prevent excessive retrying before next full sync - // todo(fs): some backoff strategy might be a better solution + } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why are the service and the checks in sync here? + // todo(fs): why is the node info not in sync here? l.services[id].InSync = true for _, check := range checks { l.checks[check.CheckID].InSync = true } - l.logger.Printf("[WARN] agent: Service %q registration blocked by ACLs", id) + l.logger.Printf("[WARN] agent: Service '%s' registration blocked by ACLs", id) return nil - - default: - l.logger.Printf("[WARN] agent: Syncing service %q failed. %s", id, err) - return err } + return err } // syncCheck is used to sync a check to the server @@ -954,27 +947,22 @@ func (l *State) syncCheck(id types.CheckID) error { } var out struct{} - err := l.Delegate.RPC("Catalog.Register", &req, &out) - switch { - case err == nil: + err := l.delegate.RPC("Catalog.Register", &req, &out) + if err == nil { l.checks[id].InSync = true // Given how the register API works, this info is also updated // every time we sync a check. l.nodeInfoInSync = true - l.logger.Printf("[INFO] agent: Synced check %q", id) + l.logger.Printf("[INFO] agent: Synced check '%s'", id) return nil - - case acl.IsErrPermissionDenied(err): - // todo(fs): mark the check to be in sync to prevent excessive retrying before next full sync - // todo(fs): some backoff strategy might be a better solution - l.checks[id].InSync = true - l.logger.Printf("[WARN] agent: Check %q registration blocked by ACLs", id) - return nil - - default: - l.logger.Printf("[WARN] agent: Syncing check %q failed. %s", id, err) - return err } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the check in sync here? + l.checks[id].InSync = true + l.logger.Printf("[WARN] agent: Check '%s' registration blocked by ACLs", id) + return nil + } + return err } func (l *State) syncNodeInfo() error { @@ -988,22 +976,17 @@ func (l *State) syncNodeInfo() error { WriteRequest: structs.WriteRequest{Token: l.tokens.AgentToken()}, } var out struct{} - err := l.Delegate.RPC("Catalog.Register", &req, &out) - switch { - case err == nil: + err := l.delegate.RPC("Catalog.Register", &req, &out) + if err == nil { l.nodeInfoInSync = true l.logger.Printf("[INFO] agent: Synced node info") return nil - - case acl.IsErrPermissionDenied(err): - // todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync - // todo(fs): some backoff strategy might be a better solution + } + if acl.IsErrPermissionDenied(err) { + // todo(fs): why is the node info in sync here? l.nodeInfoInSync = true l.logger.Printf("[WARN] agent: Node info update blocked by ACLs") return nil - - default: - l.logger.Printf("[WARN] agent: Syncing node info failed. %s", err) - return err } + return err } diff --git a/agent/local/state_test.go b/agent/local/state_test.go index b092998aa6..8ea69698a3 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -1011,18 +1011,16 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) { func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { t.Parallel() - a := agent.NewTestAgent(t.Name(), ` + a := NewTestAgent(t.Name(), ` discard_check_output = true check_update_interval = "0s" # set to "0s" since otherwise output checks are deferred `) defer a.Shutdown() inSync := func(id string) bool { - s := a.State.CheckState(types.CheckID(id)) - if s == nil { - return false - } - return s.InSync + a.state.Lock() + defer a.state.Unlock() + return a.state.checkStatus[types.CheckID(id)].inSync } // register a check @@ -1033,7 +1031,7 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { Status: api.HealthPassing, Output: "first output", } - if err := a.State.AddCheck(check, ""); err != nil { + if err := a.state.AddCheck(check, ""); err != nil { t.Fatalf("bad: %s", err) } @@ -1047,15 +1045,15 @@ func TestAgent_UpdateCheck_DiscardOutput(t *testing.T) { // update the check with the same status but different output // and the check should still be in sync. - a.State.UpdateCheck(check.CheckID, api.HealthPassing, "second output") + a.state.UpdateCheck(check.CheckID, api.HealthPassing, "second output") if !inSync("web") { t.Fatal("check should be in sync") } // disable discarding of check output and update the check again with different // output. Then the check should be out of sync. - a.State.SetDiscardCheckOutput(false) - a.State.UpdateCheck(check.CheckID, api.HealthPassing, "third output") + a.state.SetDiscardCheckOutput(false) + a.state.UpdateCheck(check.CheckID, api.HealthPassing, "third output") if inSync("web") { t.Fatal("check should be out of sync") } @@ -1318,9 +1316,8 @@ func TestAgent_ServiceTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, tokens) - l.TriggerSyncChanges = func() {} + lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) + l := local.NewState(lcfg, nil, tokens, make(chan struct{}, 1)) l.AddService(&structs.NodeService{ID: "redis"}, "") @@ -1347,9 +1344,8 @@ func TestAgent_CheckTokens(t *testing.T) { tokens := new(token.Store) tokens.UpdateUserToken("default") - cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, tokens) - l.TriggerSyncChanges = func() {} + lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) + l := local.NewState(lcfg, nil, tokens, make(chan struct{}, 1)) // Returns default when no token is set l.AddCheck(&structs.HealthCheck{CheckID: types.CheckID("mem")}, "") @@ -1372,9 +1368,8 @@ func TestAgent_CheckTokens(t *testing.T) { func TestAgent_CheckCriticalTime(t *testing.T) { t.Parallel() - cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) - l.TriggerSyncChanges = func() {} + lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) + l := local.NewState(lcfg, nil, new(token.Store), make(chan struct{}, 1)) svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000} l.AddService(svc, "") @@ -1436,9 +1431,8 @@ func TestAgent_CheckCriticalTime(t *testing.T) { func TestAgent_AddCheckFailure(t *testing.T) { t.Parallel() - cfg := config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`) - l := local.NewState(agent.LocalConfig(cfg), nil, new(token.Store)) - l.TriggerSyncChanges = func() {} + lcfg := agent.LocalConfig(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`)) + l := local.NewState(lcfg, nil, new(token.Store), make(chan struct{}, 1)) // Add a check for a service that does not exist and verify that it fails checkID := types.CheckID("redis:1")