diff --git a/agent/ae/ae.go b/agent/ae/ae.go index f3b6745036..78bf93e251 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -93,67 +93,66 @@ func (s *StateSyncer) Run() { Sync: for { - switch err := s.State.UpdateSyncState(); { + // update the sync status + err := s.State.UpdateSyncState() + if err == nil { + break + } - // update sync status failed - case err != nil: - s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - // retry updating sync status after some time or when a consul - // server was added. + // retry updating sync status after some time or when a consul + // server was added. + select { + + // consul server added to cluster. + // retry sooner than retryFailIntv to converge cluster quicker + // but stagger delay to avoid thundering herd + case <-s.ServerUpCh: select { - - // consul server added to cluster. - // retry sooner than retryFailIntv to converge cluster sooner - // but stagger delay to avoid thundering herd - case <-s.ServerUpCh: - select { - case <-time.After(stagger(serverUpIntv)): - case <-s.ShutdownCh: - return - } - - // retry full sync after some time - // todo(fs): why don't we use s.Interval here? - case <-time.After(retryFailIntv + stagger(retryFailIntv)): - + case <-time.After(stagger(serverUpIntv)): case <-s.ShutdownCh: return } - // update sync status OK - default: - // force-trigger sync to pickup any changes - s.triggerSync() + // retry full sync after some time + // todo(fs): why don't we use s.Interval here? + case <-time.After(retryFailIntv + stagger(retryFailIntv)): - // do partial syncs until it is time for a full sync again - for { - select { - // todo(fs): why don't we honor the ServerUpCh here as well? - // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) - // case <-s.ServerUpCh: - // select { - // case <-time.After(stagger(serverUpIntv)): - // continue Sync - // case <-s.ShutdownCh: - // return - // } + case <-s.ShutdownCh: + return + } + } - case <-time.After(s.Interval + stagger(s.Interval)): - continue Sync + // Force-trigger sync to pickup any changes + s.triggerSync() - case <-s.TriggerCh: - if s.Paused() { - continue - } - if err := s.State.SyncChanges(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) - } + // Wait for sync events + for { + select { + // todo(fs): why don't we honor the ServerUpCh here as well? + // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) + // case <-s.ServerUpCh: + // select { + // case <-time.After(stagger(serverUpIntv)): + // continue Sync + // case <-s.ShutdownCh: + // return + // } - case <-s.ShutdownCh: - return - } + case <-time.After(s.Interval + stagger(s.Interval)): + goto Sync + + case <-s.TriggerCh: + if s.Paused() { + continue } + if err := s.State.SyncChanges(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } + + case <-s.ShutdownCh: + return } } }