Revert "agent: refactor sync loop to linear flow of control"

This reverts commit 7a2af206ea964fc0846f9b80c10ea9d91cb3c99e.
This commit is contained in:
Frank Schroeder 2017-10-23 10:08:35 +02:00
parent 91569a7ceb
commit 26a155eb41
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD

View File

@ -93,67 +93,66 @@ func (s *StateSyncer) Run() {
Sync: Sync:
for { for {
switch err := s.State.UpdateSyncState(); { // update the sync status
err := s.State.UpdateSyncState()
if err == nil {
break
}
// update sync status failed s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
case err != nil:
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
// retry updating sync status after some time or when a consul // retry updating sync status after some time or when a consul
// server was added. // server was added.
select {
// consul server added to cluster.
// retry sooner than retryFailIntv to converge cluster quicker
// but stagger delay to avoid thundering herd
case <-s.ServerUpCh:
select { select {
case <-time.After(stagger(serverUpIntv)):
// consul server added to cluster.
// retry sooner than retryFailIntv to converge cluster sooner
// but stagger delay to avoid thundering herd
case <-s.ServerUpCh:
select {
case <-time.After(stagger(serverUpIntv)):
case <-s.ShutdownCh:
return
}
// retry full sync after some time
// todo(fs): why don't we use s.Interval here?
case <-time.After(retryFailIntv + stagger(retryFailIntv)):
case <-s.ShutdownCh: case <-s.ShutdownCh:
return return
} }
// update sync status OK // retry full sync after some time
default: // todo(fs): why don't we use s.Interval here?
// force-trigger sync to pickup any changes case <-time.After(retryFailIntv + stagger(retryFailIntv)):
s.triggerSync()
// do partial syncs until it is time for a full sync again case <-s.ShutdownCh:
for { return
select { }
// todo(fs): why don't we honor the ServerUpCh here as well? }
// todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv)
// case <-s.ServerUpCh:
// select {
// case <-time.After(stagger(serverUpIntv)):
// continue Sync
// case <-s.ShutdownCh:
// return
// }
case <-time.After(s.Interval + stagger(s.Interval)): // Force-trigger sync to pickup any changes
continue Sync s.triggerSync()
case <-s.TriggerCh: // Wait for sync events
if s.Paused() { for {
continue select {
} // todo(fs): why don't we honor the ServerUpCh here as well?
if err := s.State.SyncChanges(); err != nil { // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv)
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) // case <-s.ServerUpCh:
} // select {
// case <-time.After(stagger(serverUpIntv)):
// continue Sync
// case <-s.ShutdownCh:
// return
// }
case <-s.ShutdownCh: case <-time.After(s.Interval + stagger(s.Interval)):
return goto Sync
}
case <-s.TriggerCh:
if s.Paused() {
continue
} }
if err := s.State.SyncChanges(); err != nil {
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
case <-s.ShutdownCh:
return
} }
} }
} }