diff --git a/agent/ae/ae.go b/agent/ae/ae.go index e36016fc65..3c986b5ac1 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -96,6 +96,10 @@ type StateSyncer struct { // syncChangesEvent generates an event based on multiple conditions // when the state machine is performing partial state syncs. syncChangesEvent func() event + + // nextFullSyncCh is a chan that receives a time.Time when the next + // full sync should occur. + nextFullSyncCh <-chan time.Time } const ( @@ -148,6 +152,7 @@ func (s *StateSyncer) Run() { if s.ClusterSize == nil { panic("ClusterSize not set") } + s.resetNextFullSyncCh() s.runFSM(fullSyncState, s.nextFSMState) } @@ -245,8 +250,9 @@ func (s *StateSyncer) retrySyncFullEventFn() event { } // retry full sync after some time - // todo(fs): why don't we use s.Interval here? + // it is using retryFailInterval because it is retrying the sync case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)): + s.resetNextFullSyncCh() return syncFullTimerEvent case <-s.ShutdownCh: @@ -266,13 +272,15 @@ func (s *StateSyncer) syncChangesEventFn() event { case <-s.SyncFull.Notif(): select { case <-time.After(s.stagger(s.serverUpInterval)): + s.resetNextFullSyncCh() return syncFullNotifEvent case <-s.ShutdownCh: return shutdownEvent } // time for a full sync again - case <-time.After(s.Interval + s.stagger(s.Interval)): + case <-s.nextFullSyncCh: + s.resetNextFullSyncCh() return syncFullTimerEvent // do partial syncs on demand @@ -284,6 +292,16 @@ func (s *StateSyncer) syncChangesEventFn() event { } } +// resetNextFullSyncCh resets nextFullSyncCh and sets it to interval+stagger. +// Call this function everytime a full sync is performed. +func (s *StateSyncer) resetNextFullSyncCh() { + if s.stagger != nil { + s.nextFullSyncCh = time.After(s.Interval + s.stagger(s.Interval)) + } else { + s.nextFullSyncCh = time.After(s.Interval) + } +} + // stubbed out for testing var libRandomStagger = lib.RandomStagger diff --git a/agent/ae/ae_test.go b/agent/ae/ae_test.go index 0aa7e1a0e4..55d983e752 100644 --- a/agent/ae/ae_test.go +++ b/agent/ae/ae_test.go @@ -400,5 +400,6 @@ func testSyncer(t *testing.T) *StateSyncer { l := NewStateSyncer(nil, time.Second, nil, logger) l.stagger = func(d time.Duration) time.Duration { return d } l.ClusterSize = func() int { return 1 } + l.resetNextFullSyncCh() return l } diff --git a/agent/local/state.go b/agent/local/state.go index 399fe61ea3..026e2550a0 100644 --- a/agent/local/state.go +++ b/agent/local/state.go @@ -1023,6 +1023,15 @@ func (l *State) SyncChanges() error { l.Lock() defer l.Unlock() + // Sync the node level info if we need to. + if l.nodeInfoInSync { + l.logger.Debug("Node info in sync") + } else { + if err := l.syncNodeInfo(); err != nil { + return err + } + } + // We will do node-level info syncing at the end, since it will get // updated by a service or check sync anyway, given how the register // API works. @@ -1064,14 +1073,7 @@ func (l *State) SyncChanges() error { return err } } - - // Now sync the node level info if we need to, and didn't do any of - // the other sync operations. - if l.nodeInfoInSync { - l.logger.Debug("Node info in sync") - return nil - } - return l.syncNodeInfo() + return nil } // deleteService is used to delete a service from the server @@ -1204,6 +1206,7 @@ func (l *State) syncService(key structs.ServiceID) error { Service: l.services[key].Service, EnterpriseMeta: key.EnterpriseMeta, WriteRequest: structs.WriteRequest{Token: st}, + SkipNodeUpdate: l.nodeInfoInSync, } // Backwards-compatibility for Consul < 0.5 @@ -1266,6 +1269,7 @@ func (l *State) syncCheck(key structs.CheckID) error { Check: c.Check, EnterpriseMeta: c.Check.EnterpriseMeta, WriteRequest: structs.WriteRequest{Token: ct}, + SkipNodeUpdate: l.nodeInfoInSync, } var serviceKey structs.ServiceID